This is an automated email from the ASF dual-hosted git repository.
RongtongJin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new c270c60f85 [ISSUE #10441] Reduce per-RPC allocation in metrics by
caching static AttributeKey instances (#10443)
c270c60f85 is described below
commit c270c60f8592aef32722d40c3d1bb1e3168b3aa1
Author: Jiahua Wang <[email protected]>
AuthorDate: Fri Jun 19 09:41:57 2026 +0800
[ISSUE #10441] Reduce per-RPC allocation in metrics by caching static
AttributeKey instances (#10443)
Co-authored-by: wangjiahua.wjh <[email protected]>
---
.../broker/metrics/BrokerMetricsConstant.java | 19 ++++++
.../broker/metrics/BrokerMetricsManager.java | 68 +++++++++++-----------
.../rocketmq/broker/metrics/PopMetricsManager.java | 16 ++---
.../remoting/metrics/RemotingMetricsConstant.java | 10 ++++
.../remoting/metrics/RemotingMetricsManager.java | 5 +-
5 files changed, 74 insertions(+), 44 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
index 4b319f12f6..e87ce9ad02 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsConstant.java
@@ -16,6 +16,8 @@
*/
package org.apache.rocketmq.broker.metrics;
+import io.opentelemetry.api.common.AttributeKey;
+
public class BrokerMetricsConstant {
public static final String OPEN_TELEMETRY_METER_NAME = "broker-meter";
@@ -64,4 +66,21 @@ public class BrokerMetricsConstant {
public static final String LABEL_LANGUAGE = "language";
public static final String LABEL_VERSION = "version";
public static final String LABEL_CONSUME_MODE = "consume_mode";
+
+ // Pre-built typed AttributeKey singletons. Use these in
AttributesBuilder.put()
+ // on hot paths to avoid allocating a fresh InternalAttributeKeyImpl per
call.
+ public static final AttributeKey<String> LABEL_CLUSTER_NAME_KEY =
AttributeKey.stringKey(LABEL_CLUSTER_NAME);
+ public static final AttributeKey<String> LABEL_NODE_TYPE_KEY =
AttributeKey.stringKey(LABEL_NODE_TYPE);
+ public static final AttributeKey<String> LABEL_NODE_ID_KEY =
AttributeKey.stringKey(LABEL_NODE_ID);
+ public static final AttributeKey<String> LABEL_AGGREGATION_KEY =
AttributeKey.stringKey(LABEL_AGGREGATION);
+ public static final AttributeKey<String> LABEL_PROCESSOR_KEY =
AttributeKey.stringKey(LABEL_PROCESSOR);
+ public static final AttributeKey<String> LABEL_TOPIC_KEY =
AttributeKey.stringKey(LABEL_TOPIC);
+ public static final AttributeKey<String> LABEL_INVOCATION_STATUS_KEY =
AttributeKey.stringKey(LABEL_INVOCATION_STATUS);
+ public static final AttributeKey<Boolean> LABEL_IS_RETRY_KEY =
AttributeKey.booleanKey(LABEL_IS_RETRY);
+ public static final AttributeKey<Boolean> LABEL_IS_SYSTEM_KEY =
AttributeKey.booleanKey(LABEL_IS_SYSTEM);
+ public static final AttributeKey<String> LABEL_CONSUMER_GROUP_KEY =
AttributeKey.stringKey(LABEL_CONSUMER_GROUP);
+ public static final AttributeKey<String> LABEL_MESSAGE_TYPE_KEY =
AttributeKey.stringKey(LABEL_MESSAGE_TYPE);
+ public static final AttributeKey<String> LABEL_LANGUAGE_KEY =
AttributeKey.stringKey(LABEL_LANGUAGE);
+ public static final AttributeKey<String> LABEL_VERSION_KEY =
AttributeKey.stringKey(LABEL_VERSION);
+ public static final AttributeKey<String> LABEL_CONSUME_MODE_KEY =
AttributeKey.stringKey(LABEL_CONSUME_MODE);
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
index 6d32c7ae74..eda80dfd44 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
@@ -101,19 +101,19 @@ import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.HISTOGRAM
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.HISTOGRAM_CONSUMER_GROUP_CREATE_EXECUTE_TIME;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_AGGREGATION;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CLUSTER_NAME;
-import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
-import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUME_MODE;
-import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_RETRY;
-import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM;
-import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_LANGUAGE;
+import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP_KEY;
+import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUME_MODE_KEY;
+import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_RETRY_KEY;
+import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_IS_SYSTEM_KEY;
+import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_LANGUAGE_KEY;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_NODE_ID;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_NODE_TYPE;
-import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_PROCESSOR;
-import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC;
-import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_VERSION;
+import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_PROCESSOR_KEY;
+import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC_KEY;
+import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_VERSION_KEY;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.NODE_TYPE_BROKER;
import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.OPEN_TELEMETRY_METER_NAME;
-import static
org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_PROTOCOL_TYPE;
+import static
org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_PROTOCOL_TYPE_KEY;
import static
org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.PROTOCOL_TYPE_REMOTING;
public class BrokerMetricsManager {
@@ -197,10 +197,10 @@ public class BrokerMetricsManager {
private Attributes
buildLagAttributes(ConsumerLagCalculator.BaseCalculateResult result) {
AttributesBuilder attributesBuilder = newAttributesBuilder();
- attributesBuilder.put(LABEL_CONSUMER_GROUP, result.group);
- attributesBuilder.put(LABEL_TOPIC, result.topic);
- attributesBuilder.put(LABEL_IS_RETRY, result.isRetry);
- attributesBuilder.put(LABEL_IS_SYSTEM, isSystem(result.topic,
result.group));
+ attributesBuilder.put(LABEL_CONSUMER_GROUP_KEY, result.group);
+ attributesBuilder.put(LABEL_TOPIC_KEY, result.topic);
+ attributesBuilder.put(LABEL_IS_RETRY_KEY, result.isRetry);
+ attributesBuilder.put(LABEL_IS_SYSTEM_KEY, isSystem(result.topic,
result.group));
return attributesBuilder.build();
}
@@ -574,18 +574,18 @@ public class BrokerMetricsManager {
.setDescription("Request processor watermark")
.ofLongs()
.buildWithCallback(measurement -> {
-
measurement.record(brokerController.getSendThreadPoolQueue().size(),
newAttributesBuilder().put(LABEL_PROCESSOR, "send").build());
-
measurement.record(brokerController.getAsyncPutThreadPoolQueue().size(),
newAttributesBuilder().put(LABEL_PROCESSOR, "async_put").build());
-
measurement.record(brokerController.getPullThreadPoolQueue().size(),
newAttributesBuilder().put(LABEL_PROCESSOR, "pull").build());
-
measurement.record(brokerController.getAckThreadPoolQueue().size(),
newAttributesBuilder().put(LABEL_PROCESSOR, "ack").build());
-
measurement.record(brokerController.getQueryThreadPoolQueue().size(),
newAttributesBuilder().put(LABEL_PROCESSOR, "query_message").build());
-
measurement.record(brokerController.getClientManagerThreadPoolQueue().size(),
newAttributesBuilder().put(LABEL_PROCESSOR, "client_manager").build());
-
measurement.record(brokerController.getHeartbeatThreadPoolQueue().size(),
newAttributesBuilder().put(LABEL_PROCESSOR, "heartbeat").build());
-
measurement.record(brokerController.getLitePullThreadPoolQueue().size(),
newAttributesBuilder().put(LABEL_PROCESSOR, "lite_pull").build());
-
measurement.record(brokerController.getEndTransactionThreadPoolQueue().size(),
newAttributesBuilder().put(LABEL_PROCESSOR, "transaction").build());
-
measurement.record(brokerController.getConsumerManagerThreadPoolQueue().size(),
newAttributesBuilder().put(LABEL_PROCESSOR, "consumer_manager").build());
-
measurement.record(brokerController.getAdminBrokerThreadPoolQueue().size(),
newAttributesBuilder().put(LABEL_PROCESSOR, "admin").build());
-
measurement.record(brokerController.getReplyThreadPoolQueue().size(),
newAttributesBuilder().put(LABEL_PROCESSOR, "reply").build());
+
measurement.record(brokerController.getSendThreadPoolQueue().size(),
newAttributesBuilder().put(LABEL_PROCESSOR_KEY, "send").build());
+
measurement.record(brokerController.getAsyncPutThreadPoolQueue().size(),
newAttributesBuilder().put(LABEL_PROCESSOR_KEY, "async_put").build());
+
measurement.record(brokerController.getPullThreadPoolQueue().size(),
newAttributesBuilder().put(LABEL_PROCESSOR_KEY, "pull").build());
+
measurement.record(brokerController.getAckThreadPoolQueue().size(),
newAttributesBuilder().put(LABEL_PROCESSOR_KEY, "ack").build());
+
measurement.record(brokerController.getQueryThreadPoolQueue().size(),
newAttributesBuilder().put(LABEL_PROCESSOR_KEY, "query_message").build());
+
measurement.record(brokerController.getClientManagerThreadPoolQueue().size(),
newAttributesBuilder().put(LABEL_PROCESSOR_KEY, "client_manager").build());
+
measurement.record(brokerController.getHeartbeatThreadPoolQueue().size(),
newAttributesBuilder().put(LABEL_PROCESSOR_KEY, "heartbeat").build());
+
measurement.record(brokerController.getLitePullThreadPoolQueue().size(),
newAttributesBuilder().put(LABEL_PROCESSOR_KEY, "lite_pull").build());
+
measurement.record(brokerController.getEndTransactionThreadPoolQueue().size(),
newAttributesBuilder().put(LABEL_PROCESSOR_KEY, "transaction").build());
+
measurement.record(brokerController.getConsumerManagerThreadPoolQueue().size(),
newAttributesBuilder().put(LABEL_PROCESSOR_KEY, "consumer_manager").build());
+
measurement.record(brokerController.getAdminBrokerThreadPoolQueue().size(),
newAttributesBuilder().put(LABEL_PROCESSOR_KEY, "admin").build());
+
measurement.record(brokerController.getReplyThreadPoolQueue().size(),
newAttributesBuilder().put(LABEL_PROCESSOR_KEY, "reply").build());
});
brokerPermission = brokerMeter.gaugeBuilder(GAUGE_BROKER_PERMISSION)
@@ -665,9 +665,9 @@ public class BrokerMetricsManager {
});
metricsMap.forEach((attr, count) -> {
Attributes attributes = newAttributesBuilder()
- .put(LABEL_LANGUAGE,
attr.language.name().toLowerCase())
- .put(LABEL_VERSION,
MQVersion.getVersionDesc(attr.version).toLowerCase())
- .put(LABEL_PROTOCOL_TYPE, PROTOCOL_TYPE_REMOTING)
+ .put(LABEL_LANGUAGE_KEY,
attr.language.name().toLowerCase())
+ .put(LABEL_VERSION_KEY,
MQVersion.getVersionDesc(attr.version).toLowerCase())
+ .put(LABEL_PROTOCOL_TYPE_KEY, PROTOCOL_TYPE_REMOTING)
.build();
measurement.record(count, attributes);
});
@@ -691,12 +691,12 @@ public class BrokerMetricsManager {
});
metricsMap.forEach((attr, count) -> {
Attributes attributes = newAttributesBuilder()
- .put(LABEL_CONSUMER_GROUP, attr.group)
- .put(LABEL_LANGUAGE,
attr.language.name().toLowerCase())
- .put(LABEL_VERSION,
MQVersion.getVersionDesc(attr.version).toLowerCase())
- .put(LABEL_CONSUME_MODE,
attr.consumeMode.getTypeCN().toLowerCase())
- .put(LABEL_PROTOCOL_TYPE, PROTOCOL_TYPE_REMOTING)
- .put(LABEL_IS_SYSTEM, isSystemGroup(attr.group))
+ .put(LABEL_CONSUMER_GROUP_KEY, attr.group)
+ .put(LABEL_LANGUAGE_KEY,
attr.language.name().toLowerCase())
+ .put(LABEL_VERSION_KEY,
MQVersion.getVersionDesc(attr.version).toLowerCase())
+ .put(LABEL_CONSUME_MODE_KEY,
attr.consumeMode.getTypeCN().toLowerCase())
+ .put(LABEL_PROTOCOL_TYPE_KEY, PROTOCOL_TYPE_REMOTING)
+ .put(LABEL_IS_SYSTEM_KEY, isSystemGroup(attr.group))
.build();
measurement.record(count, attributes);
});
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsManager.java
index 1fb6e892bf..36a256f2e9 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsManager.java
@@ -45,8 +45,8 @@ import org.apache.rocketmq.store.pop.PopCheckPoint;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
-import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC;
+import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP_KEY;
+import static
org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC_KEY;
import static
org.apache.rocketmq.broker.metrics.PopMetricsConstant.COUNTER_POP_REVIVE_IN_MESSAGE_TOTAL;
import static
org.apache.rocketmq.broker.metrics.PopMetricsConstant.COUNTER_POP_REVIVE_OUT_MESSAGE_TOTAL;
import static
org.apache.rocketmq.broker.metrics.PopMetricsConstant.COUNTER_POP_REVIVE_RETRY_MESSAGES_TOTAL;
@@ -181,8 +181,8 @@ public class PopMetricsManager {
public void incPopRevivePutCount(String group, String topic,
PopReviveMessageType messageType,
PutMessageStatus status, int num) {
Attributes attributes = this.newAttributesBuilder()
- .put(LABEL_CONSUMER_GROUP, group)
- .put(LABEL_TOPIC, topic)
+ .put(LABEL_CONSUMER_GROUP_KEY, group)
+ .put(LABEL_TOPIC_KEY, topic)
.put(LABEL_REVIVE_MESSAGE_TYPE, messageType.name())
.put(LABEL_PUT_STATUS, status.name())
.build();
@@ -201,8 +201,8 @@ public class PopMetricsManager {
int num) {
AttributesBuilder builder = this.newAttributesBuilder();
Attributes attributes = builder
- .put(LABEL_CONSUMER_GROUP, group)
- .put(LABEL_TOPIC, topic)
+ .put(LABEL_CONSUMER_GROUP_KEY, group)
+ .put(LABEL_TOPIC_KEY, topic)
.put(LABEL_QUEUE_ID, queueId)
.put(LABEL_REVIVE_MESSAGE_TYPE, messageType.name())
.build();
@@ -212,8 +212,8 @@ public class PopMetricsManager {
public void incPopReviveRetryMessageCount(PopCheckPoint checkPoint,
PutMessageStatus status) {
AttributesBuilder builder = this.newAttributesBuilder();
Attributes attributes = builder
- .put(LABEL_CONSUMER_GROUP, checkPoint.getCId())
- .put(LABEL_TOPIC, checkPoint.getTopic())
+ .put(LABEL_CONSUMER_GROUP_KEY, checkPoint.getCId())
+ .put(LABEL_TOPIC_KEY, checkPoint.getTopic())
.put(LABEL_PUT_STATUS, status.name())
.build();
this.popReviveRetryMessageTotal.add(1, attributes);
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsConstant.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsConstant.java
index f9b3e4c6fa..db9999b78f 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsConstant.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsConstant.java
@@ -16,6 +16,8 @@
*/
package org.apache.rocketmq.remoting.metrics;
+import io.opentelemetry.api.common.AttributeKey;
+
public class RemotingMetricsConstant {
public static final String HISTOGRAM_RPC_LATENCY = "rocketmq_rpc_latency";
public static final String LABEL_PROTOCOL_TYPE = "protocol_type";
@@ -24,6 +26,14 @@ public class RemotingMetricsConstant {
public static final String LABEL_IS_LONG_POLLING = "is_long_polling";
public static final String LABEL_RESULT = "result";
+ // Pre-built typed AttributeKey singletons. Use these in
AttributesBuilder.put()
+ // on hot paths to avoid allocating a fresh InternalAttributeKeyImpl per
call.
+ public static final AttributeKey<String> LABEL_PROTOCOL_TYPE_KEY =
AttributeKey.stringKey(LABEL_PROTOCOL_TYPE);
+ public static final AttributeKey<String> LABEL_REQUEST_CODE_KEY =
AttributeKey.stringKey(LABEL_REQUEST_CODE);
+ public static final AttributeKey<String> LABEL_RESPONSE_CODE_KEY =
AttributeKey.stringKey(LABEL_RESPONSE_CODE);
+ public static final AttributeKey<Boolean> LABEL_IS_LONG_POLLING_KEY =
AttributeKey.booleanKey(LABEL_IS_LONG_POLLING);
+ public static final AttributeKey<String> LABEL_RESULT_KEY =
AttributeKey.stringKey(LABEL_RESULT);
+
public static final String PROTOCOL_TYPE_REMOTING = "remoting";
public static final String RESULT_ONEWAY = "oneway";
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManager.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManager.java
index 5da06dcb5b..09bff2c31e 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManager.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/metrics/RemotingMetricsManager.java
@@ -35,8 +35,9 @@ import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.metrics.NopLongHistogram;
import static
org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.HISTOGRAM_RPC_LATENCY;
-import static
org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_PROTOCOL_TYPE;
+import static
org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_PROTOCOL_TYPE_KEY;
import static
org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.PROTOCOL_TYPE_REMOTING;
+
import static
org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_CANCELED;
import static
org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_SUCCESS;
import static
org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.RESULT_WRITE_CHANNEL_FAILED;
@@ -53,7 +54,7 @@ public class RemotingMetricsManager {
return Attributes.builder();
}
return this.attributesBuilderSupplier.get()
- .put(LABEL_PROTOCOL_TYPE, PROTOCOL_TYPE_REMOTING);
+ .put(LABEL_PROTOCOL_TYPE_KEY, PROTOCOL_TYPE_REMOTING);
}
public void initMetrics(Meter meter, Supplier<AttributesBuilder>
attributesBuilderSupplier) {