This is an automated email from the ASF dual-hosted git repository.
w41ter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 66282a5b77e [fix](cloud) add metrics for FE to MS rpc (#60574)
66282a5b77e is described below
commit 66282a5b77e3e230709ce543c337d026101d189d
Author: meiyi <[email protected]>
AuthorDate: Thu Feb 12 10:30:58 2026 +0800
[fix](cloud) add metrics for FE to MS rpc (#60574)
all rpc metric:
```
doris_fe_meta_service_rpc_all_per_second 0.625
doris_fe_meta_service_rpc_all_total 12433
doris_fe_meta_service_rpc_all_failed 0
doris_fe_meta_service_rpc_all_retry 0
```
rpc metric for per rpc(use `getVersion` as an example):
```
doris_fe_meta_service_rpc_total{method="getVersion"} 552
doris_fe_meta_service_rpc_failed{method="getVersion"} 1
doris_fe_meta_service_rpc_retry{method="getVersion"} 1
doris_fe_meta_service_rpc_per_second{method="getVersion"} 0.0625
doris_fe_meta_service_rpc_latency_ms{quantile="0.75",method="getVersion"}
27.0
doris_fe_meta_service_rpc_latency_ms{quantile="0.95",method="getVersion"}
27.0
doris_fe_meta_service_rpc_latency_ms{quantile="0.98",method="getVersion"}
27.0
doris_fe_meta_service_rpc_latency_ms{quantile="0.99",method="getVersion"}
27.0
doris_fe_meta_service_rpc_latency_ms{quantile="0.999",method="getVersion"}
92.0
doris_fe_meta_service_rpc_latency_ms_sum{method="getVersion"}
10386.542007715485
doris_fe_meta_service_rpc_latency_ms_count{method="getVersion"} 552
```
---
.../apache/doris/cloud/rpc/MetaServiceProxy.java | 200 +++++++++++++++------
.../java/org/apache/doris/metric/CloudMetrics.java | 49 +++++
.../org/apache/doris/metric/MetricCalculator.java | 35 ++++
.../java/org/apache/doris/metric/MetricRepo.java | 10 ++
.../doris/cloud/rpc/MetaServiceProxyTest.java | 4 +-
5 files changed, 239 insertions(+), 59 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
index 60bef07b638..e1cb45401db 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/rpc/MetaServiceProxy.java
@@ -19,6 +19,8 @@ package org.apache.doris.cloud.rpc;
import org.apache.doris.cloud.proto.Cloud;
import org.apache.doris.common.Config;
+import org.apache.doris.metric.CloudMetrics;
+import org.apache.doris.metric.MetricRepo;
import org.apache.doris.rpc.RpcException;
import com.google.common.collect.Maps;
@@ -95,10 +97,28 @@ public class MetaServiceProxy {
public Cloud.GetInstanceResponse getInstance(Cloud.GetInstanceRequest
request)
throws RpcException {
+ long startTime = System.currentTimeMillis();
+ String methodName = "getInstance";
+ if (MetricRepo.isInit && Config.isCloudMode()) {
+ CloudMetrics.META_SERVICE_RPC_ALL_TOTAL.increase(1L);
+
CloudMetrics.META_SERVICE_RPC_TOTAL.getOrAdd(methodName).increase(1L);
+ }
+
try {
final MetaServiceClient client = getProxy();
- return client.getInstance(request);
+ Cloud.GetInstanceResponse response = client.getInstance(request);
+ if (MetricRepo.isInit && Config.isCloudMode()) {
+ CloudMetrics.META_SERVICE_RPC_LATENCY.getOrAdd(methodName)
+ .update(System.currentTimeMillis() - startTime);
+ }
+ return response;
} catch (Exception e) {
+ if (MetricRepo.isInit && Config.isCloudMode()) {
+ CloudMetrics.META_SERVICE_RPC_ALL_FAILED.increase(1L);
+
CloudMetrics.META_SERVICE_RPC_FAILED.getOrAdd(methodName).increase(1L);
+ CloudMetrics.META_SERVICE_RPC_LATENCY.getOrAdd(methodName)
+ .update(System.currentTimeMillis() - startTime);
+ }
throw new RpcException("", e.getMessage(), e);
}
}
@@ -174,13 +194,18 @@ public class MetaServiceProxy {
this.proxy = proxy;
}
- public <Response> Response executeRequest(Function<MetaServiceClient,
Response> function) throws RpcException {
+ public <Response> Response executeRequest(String methodName,
Function<MetaServiceClient, Response> function)
+ throws RpcException {
long maxRetries = Config.meta_service_rpc_retry_cnt;
for (long tried = 1; tried <= maxRetries; tried++) {
MetaServiceClient client = null;
boolean requestFailed = false;
try {
client = proxy.getProxy();
+ if (tried > 1 && MetricRepo.isInit &&
Config.isCloudMode()) {
+ CloudMetrics.META_SERVICE_RPC_ALL_RETRY.increase(1L);
+
CloudMetrics.META_SERVICE_RPC_RETRY.getOrAdd(methodName).increase(1L);
+ }
return function.apply(client);
} catch (StatusRuntimeException sre) {
requestFailed = true;
@@ -228,9 +253,47 @@ public class MetaServiceProxy {
private final MetaServiceClientWrapper w = new
MetaServiceClientWrapper(this);
+ /**
+ * Execute RPC with comprehensive metrics tracking.
+ * Tracks: total calls, failures, latency
+ */
+ private <Response> Response executeWithMetrics(String methodName,
Function<MetaServiceClient, Response> function)
+ throws RpcException {
+ long startTime = System.currentTimeMillis();
+ if (MetricRepo.isInit && Config.isCloudMode()) {
+ CloudMetrics.META_SERVICE_RPC_ALL_TOTAL.increase(1L);
+
CloudMetrics.META_SERVICE_RPC_TOTAL.getOrAdd(methodName).increase(1L);
+ }
+
+ try {
+ Response response = w.executeRequest(methodName, function);
+ if (MetricRepo.isInit && Config.isCloudMode()) {
+ CloudMetrics.META_SERVICE_RPC_LATENCY.getOrAdd(methodName)
+ .update(System.currentTimeMillis() - startTime);
+ }
+ return response;
+ } catch (RpcException e) {
+ if (MetricRepo.isInit && Config.isCloudMode()) {
+ CloudMetrics.META_SERVICE_RPC_ALL_FAILED.increase(1L);
+
CloudMetrics.META_SERVICE_RPC_FAILED.getOrAdd(methodName).increase(1L);
+ CloudMetrics.META_SERVICE_RPC_LATENCY.getOrAdd(methodName)
+ .update(System.currentTimeMillis() - startTime);
+ }
+ throw e;
+ }
+ }
+
public Future<Cloud.GetVersionResponse>
getVisibleVersionAsync(Cloud.GetVersionRequest request)
throws RpcException {
+ long startTime = System.currentTimeMillis();
+ String methodName = "getVersion";
MetaServiceClient client = null;
+
+ if (MetricRepo.isInit && Config.isCloudMode()) {
+ CloudMetrics.META_SERVICE_RPC_ALL_TOTAL.increase(1L);
+
CloudMetrics.META_SERVICE_RPC_TOTAL.getOrAdd(methodName).increase(1L);
+ }
+
try {
client = getProxy();
Future<Cloud.GetVersionResponse> future =
client.getVisibleVersionAsync(request);
@@ -242,10 +305,20 @@ public class MetaServiceProxy {
new
com.google.common.util.concurrent.FutureCallback<Cloud.GetVersionResponse>() {
@Override
public void onSuccess(Cloud.GetVersionResponse
result) {
+ if (MetricRepo.isInit && Config.isCloudMode())
{
+
CloudMetrics.META_SERVICE_RPC_LATENCY.getOrAdd(methodName)
+ .update(System.currentTimeMillis()
- startTime);
+ }
}
@Override
public void onFailure(Throwable t) {
+ if (MetricRepo.isInit && Config.isCloudMode())
{
+
CloudMetrics.META_SERVICE_RPC_ALL_FAILED.increase(1L);
+
CloudMetrics.META_SERVICE_RPC_FAILED.getOrAdd(methodName).increase(1L);
+
CloudMetrics.META_SERVICE_RPC_LATENCY.getOrAdd(methodName)
+ .update(System.currentTimeMillis()
- startTime);
+ }
if (finalClient != null) {
finalClient.shutdown(true);
}
@@ -254,6 +327,12 @@ public class MetaServiceProxy {
}
return future;
} catch (Exception e) {
+ if (MetricRepo.isInit && Config.isCloudMode()) {
+ CloudMetrics.META_SERVICE_RPC_ALL_FAILED.increase(1L);
+
CloudMetrics.META_SERVICE_RPC_FAILED.getOrAdd(methodName).increase(1L);
+ CloudMetrics.META_SERVICE_RPC_LATENCY.getOrAdd(methodName)
+ .update(System.currentTimeMillis() - startTime);
+ }
if (client != null) {
client.shutdown(true);
}
@@ -262,162 +341,164 @@ public class MetaServiceProxy {
}
public Cloud.GetVersionResponse getVersion(Cloud.GetVersionRequest
request) throws RpcException {
- return w.executeRequest((client) -> client.getVersion(request));
+ return executeWithMetrics("getVersion", (client) ->
client.getVersion(request));
}
public Cloud.CreateTabletsResponse
createTablets(Cloud.CreateTabletsRequest request) throws RpcException {
- return w.executeRequest((client) -> client.createTablets(request));
+ return executeWithMetrics("createTablets", (client) ->
client.createTablets(request));
}
public Cloud.UpdateTabletResponse updateTablet(Cloud.UpdateTabletRequest
request) throws RpcException {
- return w.executeRequest((client) -> client.updateTablet(request));
+ return executeWithMetrics("updateTablet", (client) ->
client.updateTablet(request));
}
public Cloud.BeginTxnResponse beginTxn(Cloud.BeginTxnRequest request)
throws RpcException {
- return w.executeRequest((client) -> client.beginTxn(request));
+ return executeWithMetrics("beginTxn", (client) ->
client.beginTxn(request));
}
public Cloud.PrecommitTxnResponse precommitTxn(Cloud.PrecommitTxnRequest
request)
throws RpcException {
- return w.executeRequest((client) -> client.precommitTxn(request));
+ return executeWithMetrics("precommitTxn", (client) ->
client.precommitTxn(request));
}
public Cloud.CommitTxnResponse commitTxn(Cloud.CommitTxnRequest request)
throws RpcException {
- return w.executeRequest((client) -> client.commitTxn(request));
+ return executeWithMetrics("commitTxn", (client) ->
client.commitTxn(request));
}
public Cloud.AbortTxnResponse abortTxn(Cloud.AbortTxnRequest request)
throws RpcException {
- return w.executeRequest((client) -> client.abortTxn(request));
+ return executeWithMetrics("abortTxn", (client) ->
client.abortTxn(request));
}
public Cloud.GetTxnResponse getTxn(Cloud.GetTxnRequest request)
throws RpcException {
- return w.executeRequest((client) -> client.getTxn(request));
+ return executeWithMetrics("getTxn", (client) ->
client.getTxn(request));
}
public Cloud.GetTxnIdResponse getTxnId(Cloud.GetTxnIdRequest request)
throws RpcException {
- return w.executeRequest((client) -> client.getTxnId(request));
+ return executeWithMetrics("getTxnId", (client) ->
client.getTxnId(request));
}
public Cloud.GetCurrentMaxTxnResponse
getCurrentMaxTxnId(Cloud.GetCurrentMaxTxnRequest request)
throws RpcException {
- return w.executeRequest((client) ->
client.getCurrentMaxTxnId(request));
+ return executeWithMetrics("getCurrentMaxTxnId", (client) ->
client.getCurrentMaxTxnId(request));
}
public Cloud.BeginSubTxnResponse beginSubTxn(Cloud.BeginSubTxnRequest
request)
throws RpcException {
- return w.executeRequest((client) -> client.beginSubTxn(request));
+ return executeWithMetrics("beginSubTxn", (client) ->
client.beginSubTxn(request));
}
public Cloud.AbortSubTxnResponse abortSubTxn(Cloud.AbortSubTxnRequest
request)
throws RpcException {
- return w.executeRequest((client) -> client.abortSubTxn(request));
+ return executeWithMetrics("abortSubTxn", (client) ->
client.abortSubTxn(request));
}
public Cloud.CheckTxnConflictResponse
checkTxnConflict(Cloud.CheckTxnConflictRequest request)
throws RpcException {
- return w.executeRequest((client) -> client.checkTxnConflict(request));
+ return executeWithMetrics("checkTxnConflict", (client) ->
client.checkTxnConflict(request));
}
public Cloud.CleanTxnLabelResponse
cleanTxnLabel(Cloud.CleanTxnLabelRequest request)
throws RpcException {
- return w.executeRequest((client) -> client.cleanTxnLabel(request));
+ return executeWithMetrics("cleanTxnLabel", (client) ->
client.cleanTxnLabel(request));
}
public Cloud.GetClusterResponse getCluster(Cloud.GetClusterRequest
request) throws RpcException {
- return w.executeRequest((client) -> client.getCluster(request));
+ return executeWithMetrics("getCluster", (client) ->
client.getCluster(request));
}
public Cloud.IndexResponse prepareIndex(Cloud.IndexRequest request) throws
RpcException {
- return w.executeRequest((client) -> client.prepareIndex(request));
+ return executeWithMetrics("prepareIndex", (client) ->
client.prepareIndex(request));
}
public Cloud.IndexResponse commitIndex(Cloud.IndexRequest request) throws
RpcException {
- return w.executeRequest((client) -> client.commitIndex(request));
+ return executeWithMetrics("commitIndex", (client) ->
client.commitIndex(request));
}
public Cloud.CheckKVResponse checkKv(Cloud.CheckKVRequest request) throws
RpcException {
- return w.executeRequest((client) -> client.checkKv(request));
+ return executeWithMetrics("checkKv", (client) ->
client.checkKv(request));
}
public Cloud.IndexResponse dropIndex(Cloud.IndexRequest request) throws
RpcException {
- return w.executeRequest((client) -> client.dropIndex(request));
+ return executeWithMetrics("dropIndex", (client) ->
client.dropIndex(request));
}
public Cloud.PartitionResponse preparePartition(Cloud.PartitionRequest
request)
throws RpcException {
- return w.executeRequest((client) -> client.preparePartition(request));
+ return executeWithMetrics("preparePartition", (client) ->
client.preparePartition(request));
}
public Cloud.PartitionResponse commitPartition(Cloud.PartitionRequest
request) throws RpcException {
- return w.executeRequest((client) -> client.commitPartition(request));
+ return executeWithMetrics("commitPartition", (client) ->
client.commitPartition(request));
}
public Cloud.PartitionResponse dropPartition(Cloud.PartitionRequest
request) throws RpcException {
- return w.executeRequest((client) -> client.dropPartition(request));
+ return executeWithMetrics("dropPartition", (client) ->
client.dropPartition(request));
}
public Cloud.GetTabletStatsResponse
getTabletStats(Cloud.GetTabletStatsRequest request) throws RpcException {
- return w.executeRequest((client) -> client.getTabletStats(request));
+ return executeWithMetrics("getTabletStats", (client) ->
client.getTabletStats(request));
}
public Cloud.CreateStageResponse createStage(Cloud.CreateStageRequest
request) throws RpcException {
- return w.executeRequest((client) -> client.createStage(request));
+ return executeWithMetrics("createStage", (client) ->
client.createStage(request));
}
public Cloud.GetStageResponse getStage(Cloud.GetStageRequest request)
throws RpcException {
- return w.executeRequest((client) -> client.getStage(request));
+ return executeWithMetrics("getStage", (client) ->
client.getStage(request));
}
public Cloud.DropStageResponse dropStage(Cloud.DropStageRequest request)
throws RpcException {
- return w.executeRequest((client) -> client.dropStage(request));
+ return executeWithMetrics("dropStage", (client) ->
client.dropStage(request));
}
public Cloud.GetIamResponse getIam(Cloud.GetIamRequest request) throws
RpcException {
- return w.executeRequest((client) -> client.getIam(request));
+ return executeWithMetrics("getIam", (client) ->
client.getIam(request));
}
public Cloud.BeginCopyResponse beginCopy(Cloud.BeginCopyRequest request)
throws RpcException {
- return w.executeRequest((client) -> client.beginCopy(request));
+ return executeWithMetrics("beginCopy", (client) ->
client.beginCopy(request));
}
public Cloud.FinishCopyResponse finishCopy(Cloud.FinishCopyRequest
request) throws RpcException {
- return w.executeRequest((client) -> client.finishCopy(request));
+ return executeWithMetrics("finishCopy", (client) ->
client.finishCopy(request));
}
public Cloud.GetCopyJobResponse getCopyJob(Cloud.GetCopyJobRequest
request) throws RpcException {
- return w.executeRequest((client) -> client.getCopyJob(request));
+ return executeWithMetrics("getCopyJob", (client) ->
client.getCopyJob(request));
}
public Cloud.GetCopyFilesResponse getCopyFiles(Cloud.GetCopyFilesRequest
request)
throws RpcException {
- return w.executeRequest((client) -> client.getCopyFiles(request));
+ return executeWithMetrics("getCopyFiles", (client) ->
client.getCopyFiles(request));
}
public Cloud.FilterCopyFilesResponse
filterCopyFiles(Cloud.FilterCopyFilesRequest request)
throws RpcException {
- return w.executeRequest((client) -> client.filterCopyFiles(request));
+ return executeWithMetrics("filterCopyFiles", (client) ->
client.filterCopyFiles(request));
}
public Cloud.AlterClusterResponse alterCluster(Cloud.AlterClusterRequest
request)
throws RpcException {
- return w.executeRequest((client) -> client.alterCluster(request));
+ return executeWithMetrics("alterCluster", (client) ->
client.alterCluster(request));
}
public Cloud.GetDeleteBitmapUpdateLockResponse getDeleteBitmapUpdateLock(
Cloud.GetDeleteBitmapUpdateLockRequest request)
throws RpcException {
- return w.executeRequest((client) ->
client.getDeleteBitmapUpdateLock(request));
+ return executeWithMetrics("getDeleteBitmapUpdateLock",
+ (client) -> client.getDeleteBitmapUpdateLock(request));
}
public Cloud.RemoveDeleteBitmapUpdateLockResponse
removeDeleteBitmapUpdateLock(
Cloud.RemoveDeleteBitmapUpdateLockRequest request)
throws RpcException {
- return w.executeRequest((client) ->
client.removeDeleteBitmapUpdateLock(request));
+ return executeWithMetrics("removeDeleteBitmapUpdateLock",
+ (client) -> client.removeDeleteBitmapUpdateLock(request));
}
/**
@@ -426,93 +507,98 @@ public class MetaServiceProxy {
@Deprecated
public Cloud.AlterObjStoreInfoResponse
alterObjStoreInfo(Cloud.AlterObjStoreInfoRequest request)
throws RpcException {
- return w.executeRequest((client) -> client.alterObjStoreInfo(request));
+ return executeWithMetrics("alterObjStoreInfo", (client) ->
client.alterObjStoreInfo(request));
}
public Cloud.AlterObjStoreInfoResponse
alterStorageVault(Cloud.AlterObjStoreInfoRequest request)
throws RpcException {
- return w.executeRequest((client) -> client.alterStorageVault(request));
+ return executeWithMetrics("alterStorageVault", (client) ->
client.alterStorageVault(request));
}
public Cloud.FinishTabletJobResponse
finishTabletJob(Cloud.FinishTabletJobRequest request)
throws RpcException {
- return w.executeRequest((client) -> client.finishTabletJob(request));
+ return executeWithMetrics("finishTabletJob", (client) ->
client.finishTabletJob(request));
}
public Cloud.GetRLTaskCommitAttachResponse
getRLTaskCommitAttach(Cloud.GetRLTaskCommitAttachRequest request)
throws RpcException {
- return w.executeRequest((client) ->
client.getRLTaskCommitAttach(request));
+ return executeWithMetrics("getRLTaskCommitAttach",
+ (client) -> client.getRLTaskCommitAttach(request));
}
public Cloud.ResetRLProgressResponse
resetRLProgress(Cloud.ResetRLProgressRequest request)
throws RpcException {
- return w.executeRequest((client) -> client.resetRLProgress(request));
+ return executeWithMetrics("resetRLProgress", (client) ->
client.resetRLProgress(request));
}
public Cloud.ResetStreamingJobOffsetResponse
resetStreamingJobOffset(Cloud.ResetStreamingJobOffsetRequest request)
throws RpcException {
- return w.executeRequest((client) ->
client.resetStreamingJobOffset(request));
+ return executeWithMetrics("resetStreamingJobOffset",
+ (client) -> client.resetStreamingJobOffset(request));
}
public Cloud.GetObjStoreInfoResponse
getObjStoreInfo(Cloud.GetObjStoreInfoRequest request) throws
RpcException {
- return w.executeRequest((client) -> client.getObjStoreInfo(request));
+ return executeWithMetrics("getObjStoreInfo", (client) ->
client.getObjStoreInfo(request));
}
public Cloud.AbortTxnWithCoordinatorResponse
abortTxnWithCoordinator(Cloud.AbortTxnWithCoordinatorRequest
request) throws RpcException {
- return w.executeRequest((client) ->
client.abortTxnWithCoordinator(request));
+ return executeWithMetrics("abortTxnWithCoordinator",
+ (client) -> client.abortTxnWithCoordinator(request));
}
public Cloud.GetPrepareTxnByCoordinatorResponse
getPrepareTxnByCoordinator(Cloud.GetPrepareTxnByCoordinatorRequest
request) throws RpcException {
- return w.executeRequest((client) ->
client.getPrepareTxnByCoordinator(request));
+ return executeWithMetrics("getPrepareTxnByCoordinator",
+ (client) -> client.getPrepareTxnByCoordinator(request));
}
public Cloud.CreateInstanceResponse
createInstance(Cloud.CreateInstanceRequest request) throws RpcException {
- return w.executeRequest((client) -> client.createInstance(request));
+ return executeWithMetrics("createInstance", (client) ->
client.createInstance(request));
}
public Cloud.GetStreamingTaskCommitAttachResponse
getStreamingTaskCommitAttach(
Cloud.GetStreamingTaskCommitAttachRequest request) throws
RpcException {
- return w.executeRequest((client) ->
client.getStreamingTaskCommitAttach(request));
+ return executeWithMetrics("getStreamingTaskCommitAttach",
+ (client) -> client.getStreamingTaskCommitAttach(request));
}
public Cloud.DeleteStreamingJobResponse
deleteStreamingJob(Cloud.DeleteStreamingJobRequest request)
throws RpcException {
- return w.executeRequest((client) ->
client.deleteStreamingJob(request));
+ return executeWithMetrics("deleteStreamingJob", (client) ->
client.deleteStreamingJob(request));
}
public Cloud.AlterInstanceResponse
alterInstance(Cloud.AlterInstanceRequest request) throws RpcException {
- return w.executeRequest((client) -> client.alterInstance(request));
+ return executeWithMetrics("alterInstance", (client) ->
client.alterInstance(request));
}
public Cloud.BeginSnapshotResponse
beginSnapshot(Cloud.BeginSnapshotRequest request) throws RpcException {
- return w.executeRequest((client) -> client.beginSnapshot(request));
+ return executeWithMetrics("beginSnapshot", (client) ->
client.beginSnapshot(request));
}
public Cloud.UpdateSnapshotResponse
updateSnapshot(Cloud.UpdateSnapshotRequest request) throws RpcException {
- return w.executeRequest((client) -> client.updateSnapshot(request));
+ return executeWithMetrics("updateSnapshot", (client) ->
client.updateSnapshot(request));
}
public Cloud.CommitSnapshotResponse
commitSnapshot(Cloud.CommitSnapshotRequest request) throws RpcException {
- return w.executeRequest((client) -> client.commitSnapshot(request));
+ return executeWithMetrics("commitSnapshot", (client) ->
client.commitSnapshot(request));
}
public Cloud.AbortSnapshotResponse
abortSnapshot(Cloud.AbortSnapshotRequest request) throws RpcException {
- return w.executeRequest((client) -> client.abortSnapshot(request));
+ return executeWithMetrics("abortSnapshot", (client) ->
client.abortSnapshot(request));
}
public Cloud.ListSnapshotResponse listSnapshot(Cloud.ListSnapshotRequest
request) throws RpcException {
- return w.executeRequest((client) -> client.listSnapshot(request));
+ return executeWithMetrics("listSnapshot", (client) ->
client.listSnapshot(request));
}
public Cloud.DropSnapshotResponse dropSnapshot(Cloud.DropSnapshotRequest
request) throws RpcException {
- return w.executeRequest((client) -> client.dropSnapshot(request));
+ return executeWithMetrics("dropSnapshot", (client) ->
client.dropSnapshot(request));
}
public Cloud.CloneInstanceResponse
cloneInstance(Cloud.CloneInstanceRequest request) throws RpcException {
- return w.executeRequest((client) -> client.cloneInstance(request));
+ return executeWithMetrics("cloneInstance", (client) ->
client.cloneInstance(request));
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/CloudMetrics.java
b/fe/fe-core/src/main/java/org/apache/doris/metric/CloudMetrics.java
index 6877aea5c50..4a1824d0256 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/metric/CloudMetrics.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/metric/CloudMetrics.java
@@ -51,6 +51,19 @@ public class CloudMetrics {
protected static AutoMappedMetric<LongCounterMetric>
CLUSTER_CLOUD_SMOOTH_UPGRADE_BALANCE_NUM;
protected static AutoMappedMetric<LongCounterMetric>
CLUSTER_CLOUD_WARM_UP_CACHE_BALANCE_NUM;
+ // Per-method meta-service RPC metrics
+ public static AutoMappedMetric<LongCounterMetric> META_SERVICE_RPC_TOTAL;
+ public static AutoMappedMetric<LongCounterMetric> META_SERVICE_RPC_FAILED;
+ public static AutoMappedMetric<LongCounterMetric> META_SERVICE_RPC_RETRY;
+ public static AutoMappedMetric<GaugeMetricImpl<Double>>
META_SERVICE_RPC_PER_SECOND;
+ public static AutoMappedMetric<Histogram> META_SERVICE_RPC_LATENCY;
+
+ // Aggregate meta-service metrics
+ public static LongCounterMetric META_SERVICE_RPC_ALL_TOTAL;
+ public static LongCounterMetric META_SERVICE_RPC_ALL_FAILED;
+ public static LongCounterMetric META_SERVICE_RPC_ALL_RETRY;
+ public static GaugeMetricImpl<Double> META_SERVICE_RPC_ALL_PER_SECOND;
+
protected static void init() {
if (Config.isNotCloudMode()) {
return;
@@ -124,5 +137,41 @@ public class CloudMetrics {
CLUSTER_CLOUD_WARM_UP_CACHE_BALANCE_NUM = new AutoMappedMetric<>(name
-> new LongCounterMetric(
"cloud_warm_up_balance_num", MetricUnit.NOUNIT,
"current cluster cloud warm up cache sync edit log number"));
+
+ // Per-method meta-service RPC metrics
+ META_SERVICE_RPC_TOTAL = MetricRepo.addLabeledMetrics("method", () ->
+ new LongCounterMetric("meta_service_rpc_total", MetricUnit.NOUNIT,
+ "total meta service RPC calls"));
+ META_SERVICE_RPC_FAILED = MetricRepo.addLabeledMetrics("method", () ->
+ new LongCounterMetric("meta_service_rpc_failed", MetricUnit.NOUNIT,
+ "failed meta service RPC calls"));
+ META_SERVICE_RPC_RETRY = MetricRepo.addLabeledMetrics("method", () ->
+ new LongCounterMetric("meta_service_rpc_retry", MetricUnit.NOUNIT,
+ "meta service RPC retry attempts"));
+ META_SERVICE_RPC_PER_SECOND = new AutoMappedMetric<>(methodName -> {
+ GaugeMetricImpl<Double> gauge = new
GaugeMetricImpl<>("meta_service_rpc_per_second",
+ MetricUnit.NOUNIT, "meta service RPC requests per second",
0.0);
+ gauge.addLabel(new MetricLabel("method", methodName));
+ return gauge;
+ });
+ META_SERVICE_RPC_LATENCY = new AutoMappedMetric<>(methodName -> {
+ String metricName = MetricRegistry.name("meta_service", "rpc",
"latency", "ms",
+ "method=" + methodName);
+ return MetricRepo.METRIC_REGISTER.histogram(metricName);
+ });
+
+ // Aggregate meta-service metrics
+ META_SERVICE_RPC_ALL_TOTAL = new
LongCounterMetric("meta_service_rpc_all_total",
+ MetricUnit.NOUNIT, "total meta service RPC calls across all
methods");
+
MetricRepo.DORIS_METRIC_REGISTER.addMetrics(META_SERVICE_RPC_ALL_TOTAL);
+ META_SERVICE_RPC_ALL_FAILED = new
LongCounterMetric("meta_service_rpc_all_failed",
+ MetricUnit.NOUNIT, "total failed meta service RPC calls");
+
MetricRepo.DORIS_METRIC_REGISTER.addMetrics(META_SERVICE_RPC_ALL_FAILED);
+ META_SERVICE_RPC_ALL_RETRY = new
LongCounterMetric("meta_service_rpc_all_retry",
+ MetricUnit.NOUNIT, "total meta service RPC retry attempts");
+
MetricRepo.DORIS_METRIC_REGISTER.addMetrics(META_SERVICE_RPC_ALL_RETRY);
+ META_SERVICE_RPC_ALL_PER_SECOND = new
GaugeMetricImpl<>("meta_service_rpc_all_per_second",
+ MetricUnit.NOUNIT, "meta service RPC requests per second (all
methods)", 0.0);
+
MetricRepo.DORIS_METRIC_REGISTER.addMetrics(META_SERVICE_RPC_ALL_PER_SECOND);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricCalculator.java
b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricCalculator.java
index 06d206377d7..d15b3296efd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricCalculator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricCalculator.java
@@ -34,10 +34,12 @@ public class MetricCalculator extends TimerTask {
private long lastRequestCounter = -1;
private long lastQueryErrCounter = -1;
private long lastQuerySlowCounter = -1;
+ private long lastMetaServiceRpcCounter = -1;
private Map<String, Long> clusterLastRequestCounter = new HashMap<>();
private Map<String, Long> clusterLastQueryCounter = new HashMap<>();
private Map<String, Long> clusterLastQueryErrCounter = new HashMap<>();
+ private Map<String, Long> metaServiceLastRpcCounter = new HashMap<>();
@Override
public void run() {
@@ -52,6 +54,9 @@ public class MetricCalculator extends TimerTask {
lastRequestCounter = MetricRepo.COUNTER_REQUEST_ALL.getValue();
lastQueryErrCounter = MetricRepo.COUNTER_QUERY_ERR.getValue();
lastQuerySlowCounter = MetricRepo.COUNTER_QUERY_SLOW.getValue();
+ if (Config.isCloudMode()) {
+ lastMetaServiceRpcCounter =
CloudMetrics.META_SERVICE_RPC_ALL_TOTAL.getValue();
+ }
initCloudMetrics();
return;
}
@@ -82,6 +87,14 @@ public class MetricCalculator extends TimerTask {
MetricRepo.GAUGE_QUERY_SLOW_RATE.setValue(slowRate < 0 ? 0.0 :
slowRate);
lastQuerySlowCounter = currentSlowCounter;
+ // Calculate aggregate meta-service RPS
+ if (Config.isCloudMode()) {
+ long currentMetaServiceCounter =
CloudMetrics.META_SERVICE_RPC_ALL_TOTAL.getValue();
+ double metaServiceRps = (double) (currentMetaServiceCounter -
lastMetaServiceRpcCounter) / interval;
+
CloudMetrics.META_SERVICE_RPC_ALL_PER_SECOND.setValue(metaServiceRps < 0 ? 0.0
: metaServiceRps);
+ lastMetaServiceRpcCounter = currentMetaServiceCounter;
+ }
+
updateCloudMetrics(interval);
lastTs = currentTs;
@@ -123,6 +136,15 @@ public class MetricCalculator extends TimerTask {
MetricRepo.DORIS_METRIC_REGISTER.addMetrics(metric);
});
}
+
+ // Initialize meta-service RPC metrics
+ Map<String, LongCounterMetric> metaServiceRpcMetrics =
CloudMetrics.META_SERVICE_RPC_TOTAL.getMetrics();
+ if (metaServiceRpcMetrics != null) {
+ metaServiceRpcMetrics.forEach((methodName, metric) -> {
+ metaServiceLastRpcCounter.put(methodName, metric.getValue());
+ MetricRepo.DORIS_METRIC_REGISTER.addMetrics(metric);
+ });
+ }
}
private void updateCloudMetrics(long interval) {
@@ -165,5 +187,18 @@ public class MetricCalculator extends TimerTask {
clusterLastQueryErrCounter.put(clusterId, metric.getValue());
});
}
+
+ // Update meta-service per-method RPS
+ Map<String, LongCounterMetric> metaServiceRpcMetrics =
CloudMetrics.META_SERVICE_RPC_TOTAL.getMetrics();
+ if (metaServiceRpcMetrics != null) {
+ metaServiceRpcMetrics.forEach((methodName, metric) -> {
+ double rps = (double) (metric.getValue() -
metaServiceLastRpcCounter.getOrDefault(methodName, 0L))
+ / interval;
+ rps = Double.max(rps, 0);
+ MetricRepo.updateMetaServiceRpcPerSecond(methodName, rps,
metric.getLabels());
+ MetricRepo.DORIS_METRIC_REGISTER.addMetrics(metric);
+ metaServiceLastRpcCounter.put(methodName, metric.getValue());
+ });
+ }
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
index e341bf80759..86ad7fe7dfa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/metric/MetricRepo.java
@@ -1602,6 +1602,16 @@ public final class MetricRepo {
MetricRepo.DORIS_METRIC_REGISTER.addMetrics(gauge);
}
+ public static void updateMetaServiceRpcPerSecond(String methodName, double
value, List<MetricLabel> labels) {
+ if (!MetricRepo.isInit || Config.isNotCloudMode() ||
Strings.isNullOrEmpty(methodName)) {
+ return;
+ }
+ GaugeMetricImpl<Double> gauge =
CloudMetrics.META_SERVICE_RPC_PER_SECOND.getOrAdd(methodName);
+ gauge.setValue(value);
+ gauge.setLabels(labels);
+ MetricRepo.DORIS_METRIC_REGISTER.addMetrics(gauge);
+ }
+
public static void updateClusterBackendAlive(String clusterName, String
clusterId, String ipAddress,
boolean alive) {
if (!MetricRepo.isInit || Config.isNotCloudMode() ||
Strings.isNullOrEmpty(clusterName)
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/cloud/rpc/MetaServiceProxyTest.java
b/fe/fe-core/src/test/java/org/apache/doris/cloud/rpc/MetaServiceProxyTest.java
index 2d5a4353265..876170e6729 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/cloud/rpc/MetaServiceProxyTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/cloud/rpc/MetaServiceProxyTest.java
@@ -71,7 +71,7 @@ public class MetaServiceProxyTest {
lastConnTimeMs.add(0L);
MetaServiceProxy.MetaServiceClientWrapper wrapper =
Deencapsulation.getField(proxy, "w");
- String response = wrapper.executeRequest((ignored) -> "ok");
+ String response = wrapper.executeRequest("ignored", (ignored) -> "ok");
Assert.assertEquals("ok", response);
Mockito.verify(client, Mockito.never()).shutdown(Mockito.anyBoolean());
}
@@ -93,7 +93,7 @@ public class MetaServiceProxyTest {
MetaServiceProxy.MetaServiceClientWrapper wrapper =
Deencapsulation.getField(proxy, "w");
try {
- wrapper.executeRequest((ignored) -> {
+ wrapper.executeRequest("ignored", (ignored) -> {
throw new RuntimeException("rpc failed");
});
Assert.fail("should throw RpcException");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]