This is an automated email from the ASF dual-hosted git repository.

w41ter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 66282a5b77e [fix](cloud) add metrics for FE to MS rpc (#60574)
66282a5b77e is described below

commit 66282a5b77e3e230709ce543c337d026101d189d
Author: meiyi <[email protected]>
AuthorDate: Thu Feb 12 10:30:58 2026 +0800

    [fix](cloud) add metrics for FE to MS rpc (#60574)
    
    all rpc metric:
    ```
    doris_fe_meta_service_rpc_all_per_second 0.625
    doris_fe_meta_service_rpc_all_total 12433
    doris_fe_meta_service_rpc_all_failed 0
    doris_fe_meta_service_rpc_all_retry 0
    ```
    
    rpc metric for per rpc(use `getVersion` as an example):
    ```
    doris_fe_meta_service_rpc_total{method="getVersion"} 552
    doris_fe_meta_service_rpc_failed{method="getVersion"} 1
    doris_fe_meta_service_rpc_retry{method="getVersion"} 1
    doris_fe_meta_service_rpc_per_second{method="getVersion"} 0.0625
    doris_fe_meta_service_rpc_latency_ms{quantile="0.75",method="getVersion"} 
27.0
    doris_fe_meta_service_rpc_latency_ms{quantile="0.95",method="getVersion"} 
27.0
    doris_fe_meta_service_rpc_latency_ms{quantile="0.98",method="getVersion"} 
27.0
    doris_fe_meta_service_rpc_latency_ms{quantile="0.99",method="getVersion"} 
27.0
    doris_fe_meta_service_rpc_latency_ms{quantile="0.999",method="getVersion"} 
92.0
    doris_fe_meta_service_rpc_latency_ms_sum{method="getVersion"} 
10386.542007715485
    doris_fe_meta_service_rpc_latency_ms_count{method="getVersion"} 552
    ```
---
 .../apache/doris/cloud/rpc/MetaServiceProxy.java   | 200 +++++++++++++++------
 .../java/org/apache/doris/metric/CloudMetrics.java |  49 +++++
 .../org/apache/doris/metric/MetricCalculator.java  |  35 ++++
 .../java/org/apache/doris/metric/MetricRepo.java   |  10 ++
 .../doris/cloud/rpc/MetaServiceProxyTest.java      |   4 +-
 5 files changed, 239 insertions(+), 59 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
index 60bef07b638..e1cb45401db 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
@@ -19,6 +19,8 @@ package org.apache.doris.cloud.rpc;
 
 import org.apache.doris.cloud.proto.Cloud;
 import org.apache.doris.common.Config;
+import org.apache.doris.metric.CloudMetrics;
+import org.apache.doris.metric.MetricRepo;
 import org.apache.doris.rpc.RpcException;
 
 import com.google.common.collect.Maps;
@@ -95,10 +97,28 @@ public class MetaServiceProxy {
 
     public Cloud.GetInstanceResponse getInstance(Cloud.GetInstanceRequest 
request)
             throws RpcException {
+        long startTime = System.currentTimeMillis();
+        String methodName = "getInstance";
+        if (MetricRepo.isInit && Config.isCloudMode()) {
+            CloudMetrics.META_SERVICE_RPC_ALL_TOTAL.increase(1L);
+            
CloudMetrics.META_SERVICE_RPC_TOTAL.getOrAdd(methodName).increase(1L);
+        }
+
         try {
             final MetaServiceClient client = getProxy();
-            return client.getInstance(request);
+            Cloud.GetInstanceResponse response = client.getInstance(request);
+            if (MetricRepo.isInit && Config.isCloudMode()) {
+                CloudMetrics.META_SERVICE_RPC_LATENCY.getOrAdd(methodName)
+                        .update(System.currentTimeMillis() - startTime);
+            }
+            return response;
         } catch (Exception e) {
+            if (MetricRepo.isInit && Config.isCloudMode()) {
+                CloudMetrics.META_SERVICE_RPC_ALL_FAILED.increase(1L);
+                
CloudMetrics.META_SERVICE_RPC_FAILED.getOrAdd(methodName).increase(1L);
+                CloudMetrics.META_SERVICE_RPC_LATENCY.getOrAdd(methodName)
+                        .update(System.currentTimeMillis() - startTime);
+            }
             throw new RpcException("", e.getMessage(), e);
         }
     }
@@ -174,13 +194,18 @@ public class MetaServiceProxy {
             this.proxy = proxy;
         }
 
-        public <Response> Response executeRequest(Function<MetaServiceClient, 
Response> function) throws RpcException {
+        public <Response> Response executeRequest(String methodName, 
Function<MetaServiceClient, Response> function)
+                throws RpcException {
             long maxRetries = Config.meta_service_rpc_retry_cnt;
             for (long tried = 1; tried <= maxRetries; tried++) {
                 MetaServiceClient client = null;
                 boolean requestFailed = false;
                 try {
                     client = proxy.getProxy();
+                    if (tried > 1 && MetricRepo.isInit && 
Config.isCloudMode()) {
+                        CloudMetrics.META_SERVICE_RPC_ALL_RETRY.increase(1L);
+                        
CloudMetrics.META_SERVICE_RPC_RETRY.getOrAdd(methodName).increase(1L);
+                    }
                     return function.apply(client);
                 } catch (StatusRuntimeException sre) {
                     requestFailed = true;
@@ -228,9 +253,47 @@ public class MetaServiceProxy {
 
     private final MetaServiceClientWrapper w = new 
MetaServiceClientWrapper(this);
 
+    /**
+     * Execute RPC with comprehensive metrics tracking.
+     * Tracks: total calls, failures, latency
+     */
+    private <Response> Response executeWithMetrics(String methodName, 
Function<MetaServiceClient, Response> function)
+            throws RpcException {
+        long startTime = System.currentTimeMillis();
+        if (MetricRepo.isInit && Config.isCloudMode()) {
+            CloudMetrics.META_SERVICE_RPC_ALL_TOTAL.increase(1L);
+            
CloudMetrics.META_SERVICE_RPC_TOTAL.getOrAdd(methodName).increase(1L);
+        }
+
+        try {
+            Response response = w.executeRequest(methodName, function);
+            if (MetricRepo.isInit && Config.isCloudMode()) {
+                CloudMetrics.META_SERVICE_RPC_LATENCY.getOrAdd(methodName)
+                        .update(System.currentTimeMillis() - startTime);
+            }
+            return response;
+        } catch (RpcException e) {
+            if (MetricRepo.isInit && Config.isCloudMode()) {
+                CloudMetrics.META_SERVICE_RPC_ALL_FAILED.increase(1L);
+                
CloudMetrics.META_SERVICE_RPC_FAILED.getOrAdd(methodName).increase(1L);
+                CloudMetrics.META_SERVICE_RPC_LATENCY.getOrAdd(methodName)
+                        .update(System.currentTimeMillis() - startTime);
+            }
+            throw e;
+        }
+    }
+
     public Future<Cloud.GetVersionResponse> 
getVisibleVersionAsync(Cloud.GetVersionRequest request)
             throws RpcException {
+        long startTime = System.currentTimeMillis();
+        String methodName = "getVersion";
         MetaServiceClient client = null;
+
+        if (MetricRepo.isInit && Config.isCloudMode()) {
+            CloudMetrics.META_SERVICE_RPC_ALL_TOTAL.increase(1L);
+            
CloudMetrics.META_SERVICE_RPC_TOTAL.getOrAdd(methodName).increase(1L);
+        }
+
         try {
             client = getProxy();
             Future<Cloud.GetVersionResponse> future = 
client.getVisibleVersionAsync(request);
@@ -242,10 +305,20 @@ public class MetaServiceProxy {
                         new 
com.google.common.util.concurrent.FutureCallback<Cloud.GetVersionResponse>() {
                             @Override
                             public void onSuccess(Cloud.GetVersionResponse 
result) {
+                                if (MetricRepo.isInit && Config.isCloudMode()) 
{
+                                    
CloudMetrics.META_SERVICE_RPC_LATENCY.getOrAdd(methodName)
+                                            .update(System.currentTimeMillis() 
- startTime);
+                                }
                             }
 
                             @Override
                             public void onFailure(Throwable t) {
+                                if (MetricRepo.isInit && Config.isCloudMode()) 
{
+                                    
CloudMetrics.META_SERVICE_RPC_ALL_FAILED.increase(1L);
+                                    
CloudMetrics.META_SERVICE_RPC_FAILED.getOrAdd(methodName).increase(1L);
+                                    
CloudMetrics.META_SERVICE_RPC_LATENCY.getOrAdd(methodName)
+                                            .update(System.currentTimeMillis() 
- startTime);
+                                }
                                 if (finalClient != null) {
                                     finalClient.shutdown(true);
                                 }
@@ -254,6 +327,12 @@ public class MetaServiceProxy {
             }
             return future;
         } catch (Exception e) {
+            if (MetricRepo.isInit && Config.isCloudMode()) {
+                CloudMetrics.META_SERVICE_RPC_ALL_FAILED.increase(1L);
+                
CloudMetrics.META_SERVICE_RPC_FAILED.getOrAdd(methodName).increase(1L);
+                CloudMetrics.META_SERVICE_RPC_LATENCY.getOrAdd(methodName)
+                        .update(System.currentTimeMillis() - startTime);
+            }
             if (client != null) {
                 client.shutdown(true);
             }
@@ -262,162 +341,164 @@ public class MetaServiceProxy {
     }
 
     public Cloud.GetVersionResponse getVersion(Cloud.GetVersionRequest 
request) throws RpcException {
-        return w.executeRequest((client) -> client.getVersion(request));
+        return executeWithMetrics("getVersion", (client) -> 
client.getVersion(request));
     }
 
     public Cloud.CreateTabletsResponse 
createTablets(Cloud.CreateTabletsRequest request) throws RpcException {
-        return w.executeRequest((client) -> client.createTablets(request));
+        return executeWithMetrics("createTablets", (client) -> 
client.createTablets(request));
     }
 
     public Cloud.UpdateTabletResponse updateTablet(Cloud.UpdateTabletRequest 
request) throws RpcException {
-        return w.executeRequest((client) -> client.updateTablet(request));
+        return executeWithMetrics("updateTablet", (client) -> 
client.updateTablet(request));
     }
 
     public Cloud.BeginTxnResponse beginTxn(Cloud.BeginTxnRequest request)
             throws RpcException {
-        return w.executeRequest((client) -> client.beginTxn(request));
+        return executeWithMetrics("beginTxn", (client) -> 
client.beginTxn(request));
     }
 
     public Cloud.PrecommitTxnResponse precommitTxn(Cloud.PrecommitTxnRequest 
request)
             throws RpcException {
-        return w.executeRequest((client) -> client.precommitTxn(request));
+        return executeWithMetrics("precommitTxn", (client) -> 
client.precommitTxn(request));
     }
 
     public Cloud.CommitTxnResponse commitTxn(Cloud.CommitTxnRequest request)
             throws RpcException {
-        return w.executeRequest((client) -> client.commitTxn(request));
+        return executeWithMetrics("commitTxn", (client) -> 
client.commitTxn(request));
     }
 
     public Cloud.AbortTxnResponse abortTxn(Cloud.AbortTxnRequest request)
             throws RpcException {
-        return w.executeRequest((client) -> client.abortTxn(request));
+        return executeWithMetrics("abortTxn", (client) -> 
client.abortTxn(request));
     }
 
     public Cloud.GetTxnResponse getTxn(Cloud.GetTxnRequest request)
             throws RpcException {
-        return w.executeRequest((client) -> client.getTxn(request));
+        return executeWithMetrics("getTxn", (client) -> 
client.getTxn(request));
     }
 
     public Cloud.GetTxnIdResponse getTxnId(Cloud.GetTxnIdRequest request)
             throws RpcException {
-        return w.executeRequest((client) -> client.getTxnId(request));
+        return executeWithMetrics("getTxnId", (client) -> 
client.getTxnId(request));
     }
 
     public Cloud.GetCurrentMaxTxnResponse 
getCurrentMaxTxnId(Cloud.GetCurrentMaxTxnRequest request)
             throws RpcException {
-        return w.executeRequest((client) -> 
client.getCurrentMaxTxnId(request));
+        return executeWithMetrics("getCurrentMaxTxnId", (client) -> 
client.getCurrentMaxTxnId(request));
     }
 
     public Cloud.BeginSubTxnResponse beginSubTxn(Cloud.BeginSubTxnRequest 
request)
             throws RpcException {
-        return w.executeRequest((client) -> client.beginSubTxn(request));
+        return executeWithMetrics("beginSubTxn", (client) -> 
client.beginSubTxn(request));
     }
 
     public Cloud.AbortSubTxnResponse abortSubTxn(Cloud.AbortSubTxnRequest 
request)
             throws RpcException {
-        return w.executeRequest((client) -> client.abortSubTxn(request));
+        return executeWithMetrics("abortSubTxn", (client) -> 
client.abortSubTxn(request));
     }
 
     public Cloud.CheckTxnConflictResponse 
checkTxnConflict(Cloud.CheckTxnConflictRequest request)
             throws RpcException {
-        return w.executeRequest((client) -> client.checkTxnConflict(request));
+        return executeWithMetrics("checkTxnConflict", (client) -> 
client.checkTxnConflict(request));
     }
 
     public Cloud.CleanTxnLabelResponse 
cleanTxnLabel(Cloud.CleanTxnLabelRequest request)
             throws RpcException {
-        return w.executeRequest((client) -> client.cleanTxnLabel(request));
+        return executeWithMetrics("cleanTxnLabel", (client) -> 
client.cleanTxnLabel(request));
     }
 
     public Cloud.GetClusterResponse getCluster(Cloud.GetClusterRequest 
request) throws RpcException {
-        return w.executeRequest((client) -> client.getCluster(request));
+        return executeWithMetrics("getCluster", (client) -> 
client.getCluster(request));
     }
 
     public Cloud.IndexResponse prepareIndex(Cloud.IndexRequest request) throws 
RpcException {
-        return w.executeRequest((client) -> client.prepareIndex(request));
+        return executeWithMetrics("prepareIndex", (client) -> 
client.prepareIndex(request));
     }
 
     public Cloud.IndexResponse commitIndex(Cloud.IndexRequest request) throws 
RpcException {
-        return w.executeRequest((client) -> client.commitIndex(request));
+        return executeWithMetrics("commitIndex", (client) -> 
client.commitIndex(request));
     }
 
     public Cloud.CheckKVResponse checkKv(Cloud.CheckKVRequest request) throws 
RpcException {
-        return w.executeRequest((client) -> client.checkKv(request));
+        return executeWithMetrics("checkKv", (client) -> 
client.checkKv(request));
     }
 
     public Cloud.IndexResponse dropIndex(Cloud.IndexRequest request) throws 
RpcException {
-        return w.executeRequest((client) -> client.dropIndex(request));
+        return executeWithMetrics("dropIndex", (client) -> 
client.dropIndex(request));
     }
 
     public Cloud.PartitionResponse preparePartition(Cloud.PartitionRequest 
request)
             throws RpcException {
-        return w.executeRequest((client) -> client.preparePartition(request));
+        return executeWithMetrics("preparePartition", (client) -> 
client.preparePartition(request));
     }
 
     public Cloud.PartitionResponse commitPartition(Cloud.PartitionRequest 
request) throws RpcException {
-        return w.executeRequest((client) -> client.commitPartition(request));
+        return executeWithMetrics("commitPartition", (client) -> 
client.commitPartition(request));
     }
 
     public Cloud.PartitionResponse dropPartition(Cloud.PartitionRequest 
request) throws RpcException {
-        return w.executeRequest((client) -> client.dropPartition(request));
+        return executeWithMetrics("dropPartition", (client) -> 
client.dropPartition(request));
     }
 
     public Cloud.GetTabletStatsResponse 
getTabletStats(Cloud.GetTabletStatsRequest request) throws RpcException {
-        return w.executeRequest((client) -> client.getTabletStats(request));
+        return executeWithMetrics("getTabletStats", (client) -> 
client.getTabletStats(request));
     }
 
     public Cloud.CreateStageResponse createStage(Cloud.CreateStageRequest 
request) throws RpcException {
-        return w.executeRequest((client) -> client.createStage(request));
+        return executeWithMetrics("createStage", (client) -> 
client.createStage(request));
     }
 
     public Cloud.GetStageResponse getStage(Cloud.GetStageRequest request) 
throws RpcException {
-        return w.executeRequest((client) -> client.getStage(request));
+        return executeWithMetrics("getStage", (client) -> 
client.getStage(request));
     }
 
     public Cloud.DropStageResponse dropStage(Cloud.DropStageRequest request) 
throws RpcException {
-        return w.executeRequest((client) -> client.dropStage(request));
+        return executeWithMetrics("dropStage", (client) -> 
client.dropStage(request));
     }
 
     public Cloud.GetIamResponse getIam(Cloud.GetIamRequest request) throws 
RpcException {
-        return w.executeRequest((client) -> client.getIam(request));
+        return executeWithMetrics("getIam", (client) -> 
client.getIam(request));
     }
 
     public Cloud.BeginCopyResponse beginCopy(Cloud.BeginCopyRequest request) 
throws RpcException {
-        return w.executeRequest((client) -> client.beginCopy(request));
+        return executeWithMetrics("beginCopy", (client) -> 
client.beginCopy(request));
     }
 
     public Cloud.FinishCopyResponse finishCopy(Cloud.FinishCopyRequest 
request) throws RpcException {
-        return w.executeRequest((client) -> client.finishCopy(request));
+        return executeWithMetrics("finishCopy", (client) -> 
client.finishCopy(request));
     }
 
     public Cloud.GetCopyJobResponse getCopyJob(Cloud.GetCopyJobRequest 
request) throws RpcException {
-        return w.executeRequest((client) -> client.getCopyJob(request));
+        return executeWithMetrics("getCopyJob", (client) -> 
client.getCopyJob(request));
     }
 
     public Cloud.GetCopyFilesResponse getCopyFiles(Cloud.GetCopyFilesRequest 
request)
             throws RpcException {
-        return w.executeRequest((client) -> client.getCopyFiles(request));
+        return executeWithMetrics("getCopyFiles", (client) -> 
client.getCopyFiles(request));
     }
 
     public Cloud.FilterCopyFilesResponse 
filterCopyFiles(Cloud.FilterCopyFilesRequest request)
             throws RpcException {
-        return w.executeRequest((client) -> client.filterCopyFiles(request));
+        return executeWithMetrics("filterCopyFiles", (client) -> 
client.filterCopyFiles(request));
     }
 
     public Cloud.AlterClusterResponse alterCluster(Cloud.AlterClusterRequest 
request)
             throws RpcException {
-        return w.executeRequest((client) -> client.alterCluster(request));
+        return executeWithMetrics("alterCluster", (client) -> 
client.alterCluster(request));
     }
 
     public Cloud.GetDeleteBitmapUpdateLockResponse getDeleteBitmapUpdateLock(
             Cloud.GetDeleteBitmapUpdateLockRequest request)
             throws RpcException {
-        return w.executeRequest((client) -> 
client.getDeleteBitmapUpdateLock(request));
+        return executeWithMetrics("getDeleteBitmapUpdateLock",
+                (client) -> client.getDeleteBitmapUpdateLock(request));
     }
 
     public Cloud.RemoveDeleteBitmapUpdateLockResponse 
removeDeleteBitmapUpdateLock(
             Cloud.RemoveDeleteBitmapUpdateLockRequest request)
             throws RpcException {
-        return w.executeRequest((client) -> 
client.removeDeleteBitmapUpdateLock(request));
+        return executeWithMetrics("removeDeleteBitmapUpdateLock",
+                (client) -> client.removeDeleteBitmapUpdateLock(request));
     }
 
     /**
@@ -426,93 +507,98 @@ public class MetaServiceProxy {
     @Deprecated
     public Cloud.AlterObjStoreInfoResponse 
alterObjStoreInfo(Cloud.AlterObjStoreInfoRequest request)
             throws RpcException {
-        return w.executeRequest((client) -> client.alterObjStoreInfo(request));
+        return executeWithMetrics("alterObjStoreInfo", (client) -> 
client.alterObjStoreInfo(request));
     }
 
     public Cloud.AlterObjStoreInfoResponse 
alterStorageVault(Cloud.AlterObjStoreInfoRequest request)
             throws RpcException {
-        return w.executeRequest((client) -> client.alterStorageVault(request));
+        return executeWithMetrics("alterStorageVault", (client) -> 
client.alterStorageVault(request));
     }
 
     public Cloud.FinishTabletJobResponse 
finishTabletJob(Cloud.FinishTabletJobRequest request)
             throws RpcException {
-        return w.executeRequest((client) -> client.finishTabletJob(request));
+        return executeWithMetrics("finishTabletJob", (client) -> 
client.finishTabletJob(request));
     }
 
     public Cloud.GetRLTaskCommitAttachResponse
             getRLTaskCommitAttach(Cloud.GetRLTaskCommitAttachRequest request)
             throws RpcException {
-        return w.executeRequest((client) -> 
client.getRLTaskCommitAttach(request));
+        return executeWithMetrics("getRLTaskCommitAttach",
+                (client) -> client.getRLTaskCommitAttach(request));
     }
 
     public Cloud.ResetRLProgressResponse 
resetRLProgress(Cloud.ResetRLProgressRequest request)
             throws RpcException {
-        return w.executeRequest((client) -> client.resetRLProgress(request));
+        return executeWithMetrics("resetRLProgress", (client) -> 
client.resetRLProgress(request));
     }
 
     public Cloud.ResetStreamingJobOffsetResponse 
resetStreamingJobOffset(Cloud.ResetStreamingJobOffsetRequest request)
             throws RpcException {
-        return w.executeRequest((client) -> 
client.resetStreamingJobOffset(request));
+        return executeWithMetrics("resetStreamingJobOffset",
+                (client) -> client.resetStreamingJobOffset(request));
     }
 
     public Cloud.GetObjStoreInfoResponse
             getObjStoreInfo(Cloud.GetObjStoreInfoRequest request) throws 
RpcException {
-        return w.executeRequest((client) -> client.getObjStoreInfo(request));
+        return executeWithMetrics("getObjStoreInfo", (client) -> 
client.getObjStoreInfo(request));
     }
 
     public Cloud.AbortTxnWithCoordinatorResponse
             abortTxnWithCoordinator(Cloud.AbortTxnWithCoordinatorRequest 
request) throws RpcException {
-        return w.executeRequest((client) -> 
client.abortTxnWithCoordinator(request));
+        return executeWithMetrics("abortTxnWithCoordinator",
+                (client) -> client.abortTxnWithCoordinator(request));
     }
 
     public Cloud.GetPrepareTxnByCoordinatorResponse
             getPrepareTxnByCoordinator(Cloud.GetPrepareTxnByCoordinatorRequest 
request) throws RpcException {
-        return w.executeRequest((client) -> 
client.getPrepareTxnByCoordinator(request));
+        return executeWithMetrics("getPrepareTxnByCoordinator",
+                (client) -> client.getPrepareTxnByCoordinator(request));
     }
 
     public Cloud.CreateInstanceResponse 
createInstance(Cloud.CreateInstanceRequest request) throws RpcException {
-        return w.executeRequest((client) -> client.createInstance(request));
+        return executeWithMetrics("createInstance", (client) -> 
client.createInstance(request));
     }
 
     public Cloud.GetStreamingTaskCommitAttachResponse 
getStreamingTaskCommitAttach(
             Cloud.GetStreamingTaskCommitAttachRequest request) throws 
RpcException {
-        return w.executeRequest((client) -> 
client.getStreamingTaskCommitAttach(request));
+        return executeWithMetrics("getStreamingTaskCommitAttach",
+                (client) -> client.getStreamingTaskCommitAttach(request));
     }
 
     public Cloud.DeleteStreamingJobResponse 
deleteStreamingJob(Cloud.DeleteStreamingJobRequest request)
             throws RpcException {
-        return w.executeRequest((client) -> 
client.deleteStreamingJob(request));
+        return executeWithMetrics("deleteStreamingJob", (client) -> 
client.deleteStreamingJob(request));
     }
 
     public Cloud.AlterInstanceResponse 
alterInstance(Cloud.AlterInstanceRequest request) throws RpcException {
-        return w.executeRequest((client) -> client.alterInstance(request));
+        return executeWithMetrics("alterInstance", (client) -> 
client.alterInstance(request));
     }
 
     public Cloud.BeginSnapshotResponse 
beginSnapshot(Cloud.BeginSnapshotRequest request) throws RpcException {
-        return w.executeRequest((client) -> client.beginSnapshot(request));
+        return executeWithMetrics("beginSnapshot", (client) -> 
client.beginSnapshot(request));
     }
 
     public Cloud.UpdateSnapshotResponse 
updateSnapshot(Cloud.UpdateSnapshotRequest request) throws RpcException {
-        return w.executeRequest((client) -> client.updateSnapshot(request));
+        return executeWithMetrics("updateSnapshot", (client) -> 
client.updateSnapshot(request));
     }
 
     public Cloud.CommitSnapshotResponse 
commitSnapshot(Cloud.CommitSnapshotRequest request) throws RpcException {
-        return w.executeRequest((client) -> client.commitSnapshot(request));
+        return executeWithMetrics("commitSnapshot", (client) -> 
client.commitSnapshot(request));
     }
 
     public Cloud.AbortSnapshotResponse 
abortSnapshot(Cloud.AbortSnapshotRequest request) throws RpcException {
-        return w.executeRequest((client) -> client.abortSnapshot(request));
+        return executeWithMetrics("abortSnapshot", (client) -> 
client.abortSnapshot(request));
     }
 
     public Cloud.ListSnapshotResponse listSnapshot(Cloud.ListSnapshotRequest 
request) throws RpcException {
-        return w.executeRequest((client) -> client.listSnapshot(request));
+        return executeWithMetrics("listSnapshot", (client) -> 
client.listSnapshot(request));
     }
 
     public Cloud.DropSnapshotResponse dropSnapshot(Cloud.DropSnapshotRequest 
request) throws RpcException {
-        return w.executeRequest((client) -> client.dropSnapshot(request));
+        return executeWithMetrics("dropSnapshot", (client) -> 
client.dropSnapshot(request));
     }
 
     public Cloud.CloneInstanceResponse 
cloneInstance(Cloud.CloneInstanceRequest request) throws RpcException {
-        return w.executeRequest((client) -> client.cloneInstance(request));
+        return executeWithMetrics("cloneInstance", (client) -> 
client.cloneInstance(request));
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/CloudMetrics.java 
b/fe/fe-core/src/main/java/org/apache/doris/metric/CloudMetrics.java
index 6877aea5c50..4a1824d0256 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/metric/CloudMetrics.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/metric/CloudMetrics.java
@@ -51,6 +51,19 @@ public class CloudMetrics {
     protected static AutoMappedMetric<LongCounterMetric> 
CLUSTER_CLOUD_SMOOTH_UPGRADE_BALANCE_NUM;
     protected static AutoMappedMetric<LongCounterMetric> 
CLUSTER_CLOUD_WARM_UP_CACHE_BALANCE_NUM;
 
+    // Per-method meta-service RPC metrics
+    public static AutoMappedMetric<LongCounterMetric> META_SERVICE_RPC_TOTAL;
+    public static AutoMappedMetric<LongCounterMetric> META_SERVICE_RPC_FAILED;
+    public static AutoMappedMetric<LongCounterMetric> META_SERVICE_RPC_RETRY;
+    public static AutoMappedMetric<GaugeMetricImpl<Double>> 
META_SERVICE_RPC_PER_SECOND;
+    public static AutoMappedMetric<Histogram> META_SERVICE_RPC_LATENCY;
+
+    // Aggregate meta-service metrics
+    public static LongCounterMetric META_SERVICE_RPC_ALL_TOTAL;
+    public static LongCounterMetric META_SERVICE_RPC_ALL_FAILED;
+    public static LongCounterMetric META_SERVICE_RPC_ALL_RETRY;
+    public static GaugeMetricImpl<Double> META_SERVICE_RPC_ALL_PER_SECOND;
+
     protected static void init() {
         if (Config.isNotCloudMode()) {
             return;
@@ -124,5 +137,41 @@ public class CloudMetrics {
         CLUSTER_CLOUD_WARM_UP_CACHE_BALANCE_NUM = new AutoMappedMetric<>(name 
-> new LongCounterMetric(
             "cloud_warm_up_balance_num", MetricUnit.NOUNIT,
             "current cluster cloud warm up cache sync edit log number"));
+
+        // Per-method meta-service RPC metrics
+        META_SERVICE_RPC_TOTAL = MetricRepo.addLabeledMetrics("method", () ->
+            new LongCounterMetric("meta_service_rpc_total", MetricUnit.NOUNIT,
+                "total meta service RPC calls"));
+        META_SERVICE_RPC_FAILED = MetricRepo.addLabeledMetrics("method", () ->
+            new LongCounterMetric("meta_service_rpc_failed", MetricUnit.NOUNIT,
+                "failed meta service RPC calls"));
+        META_SERVICE_RPC_RETRY = MetricRepo.addLabeledMetrics("method", () ->
+            new LongCounterMetric("meta_service_rpc_retry", MetricUnit.NOUNIT,
+                "meta service RPC retry attempts"));
+        META_SERVICE_RPC_PER_SECOND = new AutoMappedMetric<>(methodName -> {
+            GaugeMetricImpl<Double> gauge = new 
GaugeMetricImpl<>("meta_service_rpc_per_second",
+                    MetricUnit.NOUNIT, "meta service RPC requests per second", 
0.0);
+            gauge.addLabel(new MetricLabel("method", methodName));
+            return gauge;
+        });
+        META_SERVICE_RPC_LATENCY = new AutoMappedMetric<>(methodName -> {
+            String metricName = MetricRegistry.name("meta_service", "rpc", 
"latency", "ms",
+                    "method=" + methodName);
+            return MetricRepo.METRIC_REGISTER.histogram(metricName);
+        });
+
+        // Aggregate meta-service metrics
+        META_SERVICE_RPC_ALL_TOTAL = new 
LongCounterMetric("meta_service_rpc_all_total",
+            MetricUnit.NOUNIT, "total meta service RPC calls across all 
methods");
+        
MetricRepo.DORIS_METRIC_REGISTER.addMetrics(META_SERVICE_RPC_ALL_TOTAL);
+        META_SERVICE_RPC_ALL_FAILED = new 
LongCounterMetric("meta_service_rpc_all_failed",
+            MetricUnit.NOUNIT, "total failed meta service RPC calls");
+        
MetricRepo.DORIS_METRIC_REGISTER.addMetrics(META_SERVICE_RPC_ALL_FAILED);
+        META_SERVICE_RPC_ALL_RETRY = new 
LongCounterMetric("meta_service_rpc_all_retry",
+            MetricUnit.NOUNIT, "total meta service RPC retry attempts");
+        
MetricRepo.DORIS_METRIC_REGISTER.addMetrics(META_SERVICE_RPC_ALL_RETRY);
+        META_SERVICE_RPC_ALL_PER_SECOND = new 
GaugeMetricImpl<>("meta_service_rpc_all_per_second",
+            MetricUnit.NOUNIT, "meta service RPC requests per second (all 
methods)", 0.0);
+        
MetricRepo.DORIS_METRIC_REGISTER.addMetrics(META_SERVICE_RPC_ALL_PER_SECOND);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricCalculator.java 
b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricCalculator.java
index 06d206377d7..d15b3296efd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricCalculator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricCalculator.java
@@ -34,10 +34,12 @@ public class MetricCalculator extends TimerTask {
     private long lastRequestCounter = -1;
     private long lastQueryErrCounter = -1;
     private long lastQuerySlowCounter = -1;
+    private long lastMetaServiceRpcCounter = -1;
 
     private Map<String, Long> clusterLastRequestCounter = new HashMap<>();
     private Map<String, Long> clusterLastQueryCounter = new HashMap<>();
     private Map<String, Long> clusterLastQueryErrCounter = new HashMap<>();
+    private Map<String, Long> metaServiceLastRpcCounter = new HashMap<>();
 
     @Override
     public void run() {
@@ -52,6 +54,9 @@ public class MetricCalculator extends TimerTask {
             lastRequestCounter = MetricRepo.COUNTER_REQUEST_ALL.getValue();
             lastQueryErrCounter = MetricRepo.COUNTER_QUERY_ERR.getValue();
             lastQuerySlowCounter = MetricRepo.COUNTER_QUERY_SLOW.getValue();
+            if (Config.isCloudMode()) {
+                lastMetaServiceRpcCounter = 
CloudMetrics.META_SERVICE_RPC_ALL_TOTAL.getValue();
+            }
             initCloudMetrics();
             return;
         }
@@ -82,6 +87,14 @@ public class MetricCalculator extends TimerTask {
         MetricRepo.GAUGE_QUERY_SLOW_RATE.setValue(slowRate < 0 ? 0.0 : 
slowRate);
         lastQuerySlowCounter = currentSlowCounter;
 
+        // Calculate aggregate meta-service RPS
+        if (Config.isCloudMode()) {
+            long currentMetaServiceCounter = 
CloudMetrics.META_SERVICE_RPC_ALL_TOTAL.getValue();
+            double metaServiceRps = (double) (currentMetaServiceCounter - 
lastMetaServiceRpcCounter) / interval;
+            
CloudMetrics.META_SERVICE_RPC_ALL_PER_SECOND.setValue(metaServiceRps < 0 ? 0.0 
: metaServiceRps);
+            lastMetaServiceRpcCounter = currentMetaServiceCounter;
+        }
+
         updateCloudMetrics(interval);
         lastTs = currentTs;
 
@@ -123,6 +136,15 @@ public class MetricCalculator extends TimerTask {
                 MetricRepo.DORIS_METRIC_REGISTER.addMetrics(metric);
             });
         }
+
+        // Initialize meta-service RPC metrics
+        Map<String, LongCounterMetric> metaServiceRpcMetrics = 
CloudMetrics.META_SERVICE_RPC_TOTAL.getMetrics();
+        if (metaServiceRpcMetrics != null) {
+            metaServiceRpcMetrics.forEach((methodName, metric) -> {
+                metaServiceLastRpcCounter.put(methodName, metric.getValue());
+                MetricRepo.DORIS_METRIC_REGISTER.addMetrics(metric);
+            });
+        }
     }
 
     private void updateCloudMetrics(long interval) {
@@ -165,5 +187,18 @@ public class MetricCalculator extends TimerTask {
                 clusterLastQueryErrCounter.put(clusterId, metric.getValue());
             });
         }
+
+        // Update meta-service per-method RPS
+        Map<String, LongCounterMetric> metaServiceRpcMetrics = 
CloudMetrics.META_SERVICE_RPC_TOTAL.getMetrics();
+        if (metaServiceRpcMetrics != null) {
+            metaServiceRpcMetrics.forEach((methodName, metric) -> {
+                double rps = (double) (metric.getValue() - 
metaServiceLastRpcCounter.getOrDefault(methodName, 0L))
+                        / interval;
+                rps = Double.max(rps, 0);
+                MetricRepo.updateMetaServiceRpcPerSecond(methodName, rps, 
metric.getLabels());
+                MetricRepo.DORIS_METRIC_REGISTER.addMetrics(metric);
+                metaServiceLastRpcCounter.put(methodName, metric.getValue());
+            });
+        }
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java 
b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
index e341bf80759..86ad7fe7dfa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
@@ -1602,6 +1602,16 @@ public final class MetricRepo {
         MetricRepo.DORIS_METRIC_REGISTER.addMetrics(gauge);
     }
 
+    public static void updateMetaServiceRpcPerSecond(String methodName, double 
value, List<MetricLabel> labels) {
+        if (!MetricRepo.isInit || Config.isNotCloudMode() || 
Strings.isNullOrEmpty(methodName)) {
+            return;
+        }
+        GaugeMetricImpl<Double> gauge = 
CloudMetrics.META_SERVICE_RPC_PER_SECOND.getOrAdd(methodName);
+        gauge.setValue(value);
+        gauge.setLabels(labels);
+        MetricRepo.DORIS_METRIC_REGISTER.addMetrics(gauge);
+    }
+
     public static void updateClusterBackendAlive(String clusterName, String 
clusterId, String ipAddress,
             boolean alive) {
         if (!MetricRepo.isInit || Config.isNotCloudMode() || 
Strings.isNullOrEmpty(clusterName)
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/cloud/rpc/MetaServiceProxyTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/cloud/rpc/MetaServiceProxyTest.java
index 2d5a4353265..876170e6729 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/cloud/rpc/MetaServiceProxyTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/cloud/rpc/MetaServiceProxyTest.java
@@ -71,7 +71,7 @@ public class MetaServiceProxyTest {
         lastConnTimeMs.add(0L);
 
         MetaServiceProxy.MetaServiceClientWrapper wrapper = 
Deencapsulation.getField(proxy, "w");
-        String response = wrapper.executeRequest((ignored) -> "ok");
+        String response = wrapper.executeRequest("ignored", (ignored) -> "ok");
         Assert.assertEquals("ok", response);
         Mockito.verify(client, Mockito.never()).shutdown(Mockito.anyBoolean());
     }
@@ -93,7 +93,7 @@ public class MetaServiceProxyTest {
 
         MetaServiceProxy.MetaServiceClientWrapper wrapper = 
Deencapsulation.getField(proxy, "w");
         try {
-            wrapper.executeRequest((ignored) -> {
+            wrapper.executeRequest("ignored", (ignored) -> {
                 throw new RuntimeException("rpc failed");
             });
             Assert.fail("should throw RpcException");


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to