This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new f73fe6c4fb [ISSUE #9885] Fix tiered store cache count and bytes
metrics (#9886)
f73fe6c4fb is described below
commit f73fe6c4fb80f07c687bd76cad649e28903a2031
Author: majialong <[email protected]>
AuthorDate: Thu Dec 11 10:27:08 2025 +0800
[ISSUE #9885] Fix tiered store cache count and bytes metrics (#9886)
---
tieredstore/README.md | 4 +-
.../metrics/TieredStoreMetricsManager.java | 10 +-
.../metrics/TieredStoreMetricsManagerTest.java | 107 +++++++++++++++++++++
3 files changed, 115 insertions(+), 6 deletions(-)
diff --git a/tieredstore/README.md b/tieredstore/README.md
index 6b5ecc8c8d..1532fc3b5f 100644
--- a/tieredstore/README.md
+++ b/tieredstore/README.md
@@ -45,12 +45,12 @@ Tiered storage provides some useful metrics, see
[RIP-46](https://github.com/apa
| Histogram | rocketmq_tiered_store_provider_upload_bytes | byte
|
| Histogram | rocketmq_tiered_store_provider_download_bytes | byte
|
| Gauge | rocketmq_tiered_store_dispatch_behind |
|
-| Gauge | rocketmq_tiered_store_dispatch_latency | byte
|
+| Gauge | rocketmq_tiered_store_dispatch_latency |
milliseconds |
| Counter | rocketmq_tiered_store_messages_dispatch_total |
|
| Counter | rocketmq_tiered_store_messages_out_total |
|
| Counter | rocketmq_tiered_store_get_message_fallback_total |
|
| Gauge | rocketmq_tiered_store_read_ahead_cache_count |
|
-| Gauge | rocketmq_tiered_store_read_ahead_cache_bytes | byte
|
+| Gauge | rocketmq_tiered_store_read_ahead_cache_bytes | bytes
|
| Counter | rocketmq_tiered_store_read_ahead_cache_access_total |
|
| Counter | rocketmq_tiered_store_read_ahead_cache_hit_total |
|
| Gauge | rocketmq_storage_message_reserve_time |
milliseconds |
diff --git
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
index 4d08328483..e0ebff08cb 100644
---
a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
+++
b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
@@ -207,7 +207,7 @@ public class TieredStoreMetricsManager {
dispatchLatency = meter.gaugeBuilder(GAUGE_DISPATCH_LATENCY)
.setDescription("Tiered store dispatch latency")
- .setUnit("seconds")
+ .setUnit("milliseconds")
.ofLongs()
.buildWithCallback(measurement -> {
for (FlatMessageFile flatFile :
flatFileStore.deepCopyFlatFileToList()) {
@@ -261,7 +261,7 @@ public class TieredStoreMetricsManager {
.ofLongs()
.buildWithCallback(measurement -> {
if (fetcher instanceof MessageStoreFetcherImpl) {
- long count = ((MessageStoreFetcherImpl)
fetcher).getFetcherCache().stats().loadCount();
+ long count = ((MessageStoreFetcherImpl)
fetcher).getFetcherCache().estimatedSize();
measurement.record(count, newAttributesBuilder().build());
}
});
@@ -272,8 +272,10 @@ public class TieredStoreMetricsManager {
.ofLongs()
.buildWithCallback(measurement -> {
if (fetcher instanceof MessageStoreFetcherImpl) {
- long count = ((MessageStoreFetcherImpl)
fetcher).getFetcherCache().estimatedSize();
- measurement.record(count, newAttributesBuilder().build());
+ long bytes = ((MessageStoreFetcherImpl)
fetcher).getFetcherCache().policy().eviction()
+ .map(eviction -> eviction.weightedSize().orElse(0L))
+ .orElse(0L);
+ measurement.record(bytes, newAttributesBuilder().build());
}
});
diff --git
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManagerTest.java
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManagerTest.java
index cc4d9e2c68..0434138961 100644
---
a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManagerTest.java
+++
b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManagerTest.java
@@ -16,13 +16,26 @@
*/
package org.apache.rocketmq.tieredstore.metrics;
+import com.github.benmanes.caffeine.cache.Cache;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.metrics.DoubleGaugeBuilder;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableLongGauge;
+import io.opentelemetry.api.metrics.ObservableLongMeasurement;
import io.opentelemetry.sdk.OpenTelemetrySdk;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.tieredstore.MessageStoreConfig;
import org.apache.rocketmq.tieredstore.TieredMessageStore;
+import org.apache.rocketmq.tieredstore.common.SelectBufferResult;
import org.apache.rocketmq.tieredstore.core.MessageStoreFetcherImpl;
import org.apache.rocketmq.tieredstore.file.FlatFileStore;
import org.apache.rocketmq.tieredstore.provider.PosixFileSegment;
+import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
@@ -52,4 +65,98 @@ public class TieredStoreMetricsManagerTest {
public void newAttributesBuilder() {
TieredStoreMetricsManager.newAttributesBuilder();
}
+
+ @Test
+ public void testCacheCountMetric() {
+ MessageStoreConfig storeConfig = new MessageStoreConfig();
+ TieredMessageStore messageStore =
Mockito.mock(TieredMessageStore.class);
+ Mockito.when(messageStore.getStoreConfig()).thenReturn(storeConfig);
+
Mockito.when(messageStore.getFlatFileStore()).thenReturn(Mockito.mock(FlatFileStore.class));
+ // The fetcher will create real cache
+ MessageStoreFetcherImpl fetcher = new
MessageStoreFetcherImpl(messageStore);
+
+ AtomicLong capturedCacheCount = new AtomicLong(-1);
+ Meter mockMeter =
createMockMeter(TieredStoreMetricsConstant.GAUGE_CACHE_COUNT,
capturedCacheCount);
+
+ // Prepare cache before init so the gauge callback sees a populated
cache instead of an empty one.
+ int[] bufferSizes = prepareTestCache(fetcher);
+
+ TieredStoreMetricsManager.init(mockMeter,
+ null, storeConfig, fetcher,
+ Mockito.mock(FlatFileStore.class),
Mockito.mock(DefaultMessageStore.class));
+
+ // CacheCount gauge should report the number of cached entries.
+ Assert.assertEquals(bufferSizes.length, capturedCacheCount.get());
+ }
+
+ @Test
+ public void testCacheBytesMetric() {
+ MessageStoreConfig storeConfig = new MessageStoreConfig();
+ TieredMessageStore messageStore =
Mockito.mock(TieredMessageStore.class);
+ Mockito.when(messageStore.getStoreConfig()).thenReturn(storeConfig);
+
Mockito.when(messageStore.getFlatFileStore()).thenReturn(Mockito.mock(FlatFileStore.class));
+ // The fetcher will create real cache
+ MessageStoreFetcherImpl fetcher = new
MessageStoreFetcherImpl(messageStore);
+
+ AtomicLong capturedCacheBytes = new AtomicLong(-1);
+ Meter mockMeter =
createMockMeter(TieredStoreMetricsConstant.GAUGE_CACHE_BYTES,
capturedCacheBytes);
+
+ // Prepare cache before init so the gauge callback sees a populated
cache instead of an empty one.
+ int[] bufferSizes = prepareTestCache(fetcher);
+
+ TieredStoreMetricsManager.init(mockMeter,
+ null, storeConfig, fetcher,
+ Mockito.mock(FlatFileStore.class),
Mockito.mock(DefaultMessageStore.class));
+
+ // CacheBytes gauge should report the sum of all cached buffer sizes.
+ int expectedSum = Arrays.stream(bufferSizes).sum();
+ Assert.assertEquals(expectedSum, capturedCacheBytes.get());
+ }
+
+ private Meter createMockMeter(String targetMetricName, AtomicLong
capturedValue) {
+ Meter mockMeter = Mockito.mock(Meter.class,
Mockito.RETURNS_DEEP_STUBS);
+
+ // Setup target gauge builder chain to capture the callback value
+ DoubleGaugeBuilder targetGaugeBuilder =
Mockito.mock(DoubleGaugeBuilder.class, Mockito.RETURNS_DEEP_STUBS);
+
Mockito.when(mockMeter.gaugeBuilder(targetMetricName)).thenReturn(targetGaugeBuilder);
+
Mockito.when(targetGaugeBuilder.setDescription(Mockito.anyString())).thenReturn(targetGaugeBuilder);
+
Mockito.when(targetGaugeBuilder.setUnit(Mockito.anyString())).thenReturn(targetGaugeBuilder);
+
Mockito.when(targetGaugeBuilder.ofLongs().buildWithCallback(Mockito.any(Consumer.class)))
+ .thenAnswer(invocation -> {
+ Consumer<ObservableLongMeasurement> callback =
invocation.getArgument(0);
+ // Immediately invoke the callback to capture the current
cache state
+ callback.accept(new ObservableLongMeasurement() {
+ @Override
+ public void record(long value) {
+ capturedValue.set(value);
+ }
+
+ @Override
+ public void record(long value, Attributes attributes) {
+ capturedValue.set(value);
+ }
+ });
+ return Mockito.mock(ObservableLongGauge.class);
+ });
+
+ return mockMeter;
+ }
+
+ private int[] prepareTestCache(MessageStoreFetcherImpl fetcher) {
+ Cache<String, SelectBufferResult> cache = fetcher.getFetcherCache();
+ String topic = "TestTopic";
+ MessageQueue mq1 = new MessageQueue(topic, "broker", 0);
+ MessageQueue mq2 = new MessageQueue(topic, "broker", 1);
+
+ int[] bufferSizes = {100, 200, 150, 300};
+ for (int i = 0; i < bufferSizes.length; i++) {
+ SelectBufferResult result = new SelectBufferResult(
+ ByteBuffer.allocate(bufferSizes[i]), 0L, bufferSizes[i], 0L);
+ MessageQueue mq = i < 2 ? mq1 : mq2;
+ String key = String.format("%s@%d@%d", mq.getTopic(),
mq.getQueueId(), (i + 1) * 100L);
+ cache.put(key, result);
+ }
+ return bufferSizes;
+ }
+
}