This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 03313a9b0 [metric] Change some metrics from bucket-level to
table-level and server-level (#1531)
03313a9b0 is described below
commit 03313a9b02dca57c87c406f0ecf396b08fa8726a
Author: xiaozhou <[email protected]>
AuthorDate: Fri Sep 12 14:07:44 2025 +0800
[metric] Change some metrics from bucket-level to table-level and
server-level (#1531)
---
.../java/org/apache/fluss/metrics/MetricNames.java | 30 ++--
.../java/org/apache/fluss/server/kv/KvManager.java | 17 ++-
.../java/org/apache/fluss/server/kv/KvTablet.java | 30 +---
.../fluss/server/kv/prewrite/KvPreWriteBuffer.java | 17 ++-
.../java/org/apache/fluss/server/log/LocalLog.java | 17 +--
.../org/apache/fluss/server/log/LogManager.java | 17 ++-
.../org/apache/fluss/server/log/LogTablet.java | 9 +-
.../fluss/server/log/remote/LogTieringTask.java | 8 +-
.../server/metrics/group/BucketMetricGroup.java | 22 ++-
...TableMetricGroup.java => TableMetricGroup.java} | 106 +++++++-------
.../metrics/group/TabletServerMetricGroup.java | 153 ++++++++++++++++++---
.../org/apache/fluss/server/replica/Replica.java | 35 ++---
.../fluss/server/replica/ReplicaManager.java | 50 ++++---
.../apache/fluss/server/tablet/TabletServer.java | 5 +-
.../org/apache/fluss/server/kv/KvManagerTest.java | 16 ++-
.../org/apache/fluss/server/kv/KvTabletTest.java | 3 +
.../server/kv/prewrite/KvPreWriteBufferTest.java | 13 +-
.../org/apache/fluss/server/log/LocalLogTest.java | 2 +
.../apache/fluss/server/log/LogManagerTest.java | 22 ++-
.../org/apache/fluss/server/log/LogTabletTest.java | 3 +
.../server/metrics/group/TestingMetricGroups.java | 9 +-
.../fluss/server/replica/ReplicaTestBase.java | 15 +-
.../replica/fetcher/ReplicaFetcherThreadTest.java | 7 +-
.../maintenance/observability/monitor-metrics.md | 152 ++++++++++----------
24 files changed, 474 insertions(+), 284 deletions(-)
diff --git
a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
index a51820e54..7bd21b552 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
@@ -94,13 +94,8 @@ public class MetricNames {
public static final String FAILED_PREFIX_LOOKUP_REQUESTS_RATE =
"failedPrefixLookupRequestsPerSecond";
- //
--------------------------------------------------------------------------------------------
- // metrics for table bucket
- //
--------------------------------------------------------------------------------------------
-
// for replica
public static final String UNDER_REPLICATED = "underReplicated";
- public static final String IN_SYNC_REPLICAS = "inSyncReplicasCount";
public static final String UNDER_MIN_ISR = "underMinIsr";
public static final String AT_MIN_ISR = "atMinIsr";
public static final String ISR_EXPANDS_RATE = "isrExpandsPerSecond";
@@ -108,21 +103,28 @@ public class MetricNames {
public static final String FAILED_ISR_UPDATES_RATE =
"failedIsrUpdatesPerSecond";
// for log tablet
- public static final String LOG_NUM_SEGMENTS = "numSegments";
- public static final String LOG_END_OFFSET = "endOffset";
- public static final String LOG_SIZE = "size";
- public static final String LOG_FLUSH_RATE = "flushPerSecond";
- public static final String LOG_FLUSH_LATENCY_MS = "flushLatencyMs";
+ public static final String LOG_FLUSH_RATE = "logFlushPerSecond";
+ public static final String LOG_FLUSH_LATENCY_MS = "logFlushLatencyMs";
// for kv tablet
- public static final String KV_LATEST_SNAPSHOT_SIZE = "latestSnapshotSize";
+ public static final String KV_FLUSH_RATE = "kvFlushPerSecond";
+ public static final String KV_FLUSH_LATENCY_MS = "kvFlushLatencyMs";
public static final String KV_PRE_WRITE_BUFFER_TRUNCATE_AS_DUPLICATED_RATE
=
"preWriteBufferTruncateAsDuplicatedPerSecond";
public static final String KV_PRE_WRITE_BUFFER_TRUNCATE_AS_ERROR_RATE =
"preWriteBufferTruncateAsErrorPerSecond";
- public static final String KV_PRE_WRITE_BUFFER_FLUSH_RATE =
"preWriteBufferFlushPerSecond";
- public static final String KV_PRE_WRITE_BUFFER_FLUSH_LATENCY_MS =
- "preWriteBufferFlushLatencyMs";
+
+ //
--------------------------------------------------------------------------------------------
+ // metrics for table bucket
+ //
--------------------------------------------------------------------------------------------
+
+ // for log tablet
+ public static final String LOG_NUM_SEGMENTS = "numSegments";
+ public static final String LOG_END_OFFSET = "endOffset";
+ public static final String LOG_SIZE = "size";
+
+ // for kv tablet
+ public static final String KV_LATEST_SNAPSHOT_SIZE = "latestSnapshotSize";
//
--------------------------------------------------------------------------------------------
// metrics for rpc client
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java
index 22290e983..2d2764499 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java
@@ -36,6 +36,7 @@ import org.apache.fluss.server.TabletManagerBase;
import org.apache.fluss.server.kv.rowmerger.RowMerger;
import org.apache.fluss.server.log.LogManager;
import org.apache.fluss.server.log.LogTablet;
+import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
import org.apache.fluss.server.zk.ZooKeeperClient;
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator;
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator;
@@ -69,6 +70,8 @@ public final class KvManager extends TabletManagerBase {
private static final Logger LOG = LoggerFactory.getLogger(KvManager.class);
private final LogManager logManager;
+ private final TabletServerMetricGroup serverMetricGroup;
+
private final ZooKeeperClient zkClient;
private final Map<TableBucket, KvTablet> currentKvs =
MapUtils.newConcurrentHashMap();
@@ -91,7 +94,8 @@ public final class KvManager extends TabletManagerBase {
Configuration conf,
ZooKeeperClient zkClient,
int recoveryThreadsPerDataDir,
- LogManager logManager)
+ LogManager logManager,
+ TabletServerMetricGroup tabletServerMetricGroup)
throws IOException {
super(TabletType.KV, dataDir, conf, recoveryThreadsPerDataDir);
this.logManager = logManager;
@@ -100,10 +104,14 @@ public final class KvManager extends TabletManagerBase {
this.zkClient = zkClient;
this.remoteKvDir = FlussPaths.remoteKvDir(conf);
this.remoteFileSystem = remoteKvDir.getFileSystem();
+ this.serverMetricGroup = tabletServerMetricGroup;
}
public static KvManager create(
- Configuration conf, ZooKeeperClient zkClient, LogManager
logManager)
+ Configuration conf,
+ ZooKeeperClient zkClient,
+ LogManager logManager,
+ TabletServerMetricGroup tabletServerMetricGroup)
throws IOException {
String dataDirString = conf.getString(ConfigOptions.DATA_DIR);
File dataDir = new File(dataDirString).getAbsoluteFile();
@@ -112,7 +120,8 @@ public final class KvManager extends TabletManagerBase {
conf,
zkClient,
conf.getInt(ConfigOptions.NETTY_SERVER_NUM_WORKER_THREADS),
- logManager);
+ logManager,
+ tabletServerMetricGroup);
}
public void startup() {
@@ -171,6 +180,7 @@ public final class KvManager extends TabletManagerBase {
logTablet,
tabletDir,
conf,
+ serverMetricGroup,
arrowBufferAllocator,
memorySegmentPool,
kvFormat,
@@ -279,6 +289,7 @@ public final class KvManager extends TabletManagerBase {
logTablet,
tabletDir,
conf,
+ serverMetricGroup,
arrowBufferAllocator,
memorySegmentPool,
tableInfo.getTableConfig().getKvFormat(),
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
index 4c3fde697..0437053af 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
@@ -29,9 +29,6 @@ import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TablePath;
-import org.apache.fluss.metrics.MeterView;
-import org.apache.fluss.metrics.MetricNames;
-import org.apache.fluss.metrics.groups.MetricGroup;
import org.apache.fluss.record.ChangeType;
import org.apache.fluss.record.KvRecord;
import org.apache.fluss.record.KvRecordBatch;
@@ -55,7 +52,7 @@ import org.apache.fluss.server.kv.wal.IndexWalBuilder;
import org.apache.fluss.server.kv.wal.WalBuilder;
import org.apache.fluss.server.log.LogAppendInfo;
import org.apache.fluss.server.log.LogTablet;
-import org.apache.fluss.server.metrics.group.BucketMetricGroup;
+import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
import org.apache.fluss.server.utils.FatalErrorHandler;
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator;
import org.apache.fluss.types.DataType;
@@ -124,6 +121,7 @@ public final class KvTablet {
TableBucket tableBucket,
LogTablet logTablet,
File kvTabletDir,
+ TabletServerMetricGroup serverMetricGroup,
RocksDBKv rocksDBKv,
long writeBatchSize,
LogFormat logFormat,
@@ -139,7 +137,7 @@ public final class KvTablet {
this.kvTabletDir = kvTabletDir;
this.rocksDBKv = rocksDBKv;
this.writeBatchSize = writeBatchSize;
- this.kvPreWriteBuffer = new KvPreWriteBuffer(createKvBatchWriter());
+ this.kvPreWriteBuffer = new KvPreWriteBuffer(createKvBatchWriter(),
serverMetricGroup);
this.logFormat = logFormat;
this.arrowWriterProvider = new ArrowWriterPool(arrowBufferAllocator);
this.memorySegmentPool = memorySegmentPool;
@@ -153,6 +151,7 @@ public final class KvTablet {
LogTablet logTablet,
File kvTabletDir,
Configuration serverConf,
+ TabletServerMetricGroup serverMetricGroup,
BufferAllocator arrowBufferAllocator,
MemorySegmentPool memorySegmentPool,
KvFormat kvFormat,
@@ -168,6 +167,7 @@ public final class KvTablet {
logTablet,
kvTabletDir,
serverConf,
+ serverMetricGroup,
arrowBufferAllocator,
memorySegmentPool,
kvFormat,
@@ -182,6 +182,7 @@ public final class KvTablet {
LogTablet logTablet,
File kvTabletDir,
Configuration serverConf,
+ TabletServerMetricGroup serverMetricGroup,
BufferAllocator arrowBufferAllocator,
MemorySegmentPool memorySegmentPool,
KvFormat kvFormat,
@@ -195,6 +196,7 @@ public final class KvTablet {
tableBucket,
logTablet,
kvTabletDir,
+ serverMetricGroup,
kv,
serverConf.get(ConfigOptions.KV_WRITE_BATCH_SIZE).getBytes(),
logTablet.getLogFormat(),
@@ -243,24 +245,6 @@ public final class KvTablet {
return flushedLogOffset;
}
- public void registerMetrics(BucketMetricGroup bucketMetricGroup) {
- MetricGroup metricGroup = bucketMetricGroup.addGroup("kv");
-
- // about pre-write buffer.
- metricGroup.meter(
- MetricNames.KV_PRE_WRITE_BUFFER_FLUSH_RATE,
- new MeterView(kvPreWriteBuffer.getFlushCount()));
- metricGroup.histogram(
- MetricNames.KV_PRE_WRITE_BUFFER_FLUSH_LATENCY_MS,
- kvPreWriteBuffer.getFlushLatencyHistogram());
- metricGroup.meter(
- MetricNames.KV_PRE_WRITE_BUFFER_TRUNCATE_AS_DUPLICATED_RATE,
- new
MeterView(kvPreWriteBuffer.getTruncateAsDuplicatedCount()));
- metricGroup.meter(
- MetricNames.KV_PRE_WRITE_BUFFER_TRUNCATE_AS_ERROR_RATE,
- new MeterView(kvPreWriteBuffer.getTruncateAsErrorCount()));
- }
-
/**
* Put the KvRecordBatch into the kv storage, and return the appended wal
log info.
*
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBuffer.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBuffer.java
index 7d53b2fe7..255d7a29e 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBuffer.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBuffer.java
@@ -20,10 +20,9 @@ package org.apache.fluss.server.kv.prewrite;
import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.memory.MemorySegment;
import org.apache.fluss.metrics.Counter;
-import org.apache.fluss.metrics.DescriptiveStatisticsHistogram;
import org.apache.fluss.metrics.Histogram;
-import org.apache.fluss.metrics.SimpleCounter;
import org.apache.fluss.server.kv.KvBatchWriter;
+import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
import org.apache.fluss.utils.MurmurHashUtils;
import javax.annotation.Nullable;
@@ -61,7 +60,7 @@ import static
org.apache.fluss.utils.UnsafeUtils.BYTE_ARRAY_BASE_OFFSET;
* <li>Buffer all the key-value pairs that are waiting for the corresponding
WAL to be persisted.
* And flush these key-value pairs whose WAL has been persisted to
underlying kv storage.
* <li>A temporary in-memory key-value buffer for put/get a key. Since Fluss
will lookup the
- * previous written data to generate CDC as WAL, it need a buffer to
buffer the data been
+ * previous written data to generate CDC as WAL, it needs a buffer to
buffer the data been
* written before but is still waiting for the WAL to be persisted
before flush to underlying
* kv storage.
* </ol>
@@ -104,14 +103,14 @@ public class KvPreWriteBuffer implements AutoCloseable {
// the max LSN in the buffer
private long maxLogSequenceNumber = -1;
- public KvPreWriteBuffer(KvBatchWriter kvBatchWriter) {
+ public KvPreWriteBuffer(
+ KvBatchWriter kvBatchWriter, TabletServerMetricGroup
serverMetricGroup) {
this.kvBatchWriter = kvBatchWriter;
- flushCount = new SimpleCounter();
- // consider won't flush frequently, we set a small window size
- flushLatencyHistogram = new DescriptiveStatisticsHistogram(5);
- truncateAsDuplicatedCount = new SimpleCounter();
- truncateAsErrorCount = new SimpleCounter();
+ flushCount = serverMetricGroup.kvFlushCount();
+ flushLatencyHistogram = serverMetricGroup.kvFlushLatencyHistogram();
+ truncateAsDuplicatedCount =
serverMetricGroup.kvTruncateAsDuplicatedCount();
+ truncateAsErrorCount = serverMetricGroup.kvTruncateAsErrorCount();
}
/**
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/log/LocalLog.java
b/fluss-server/src/main/java/org/apache/fluss/server/log/LocalLog.java
index 8266c4ed5..061c83ebb 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/log/LocalLog.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LocalLog.java
@@ -25,11 +25,10 @@ import org.apache.fluss.exception.LogStorageException;
import org.apache.fluss.metadata.LogFormat;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metrics.Counter;
-import org.apache.fluss.metrics.DescriptiveStatisticsHistogram;
import org.apache.fluss.metrics.Histogram;
-import org.apache.fluss.metrics.SimpleCounter;
import org.apache.fluss.record.FileLogProjection;
import org.apache.fluss.record.MemoryLogRecords;
+import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
import org.apache.fluss.utils.FileUtils;
import org.apache.fluss.utils.FlussPaths;
@@ -90,6 +89,7 @@ public final class LocalLog {
public LocalLog(
File logTabletDir,
Configuration config,
+ TabletServerMetricGroup serverMetricGroup,
LogSegments segments,
long recoveryPoint,
LogOffsetMetadata nextOffsetMetadata,
@@ -105,9 +105,8 @@ public final class LocalLog {
this.logFormat = logFormat;
lastFlushedTime = new AtomicLong(System.currentTimeMillis());
- flushCount = new SimpleCounter();
- // consider won't flush frequently, we set a small window size
- flushLatencyHistogram = new DescriptiveStatisticsHistogram(5);
+ flushCount = serverMetricGroup.logFlushCount();
+ flushLatencyHistogram = serverMetricGroup.logFlushLatencyHistogram();
localLogStartOffset = segments.isEmpty() ? 0L :
segments.firstSegmentBaseOffset().get();
localMaxTimestamp =
segments.isEmpty() ? 0L :
segments.lastSegment().get().maxTimestampSoFar();
@@ -125,14 +124,6 @@ public final class LocalLog {
return recoveryPoint;
}
- Histogram getFlushLatencyHistogram() {
- return flushLatencyHistogram;
- }
-
- Counter getFlushCount() {
- return flushCount;
- }
-
/** The offset metadata of the next message that will be appended to the
log. */
@VisibleForTesting
LogOffsetMetadata getLocalLogEndOffsetMetadata() {
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java
index f881f388e..644eedfb4 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java
@@ -29,6 +29,7 @@ import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.server.TabletManagerBase;
import org.apache.fluss.server.log.checkpoint.OffsetCheckpointFile;
+import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
import org.apache.fluss.server.zk.ZooKeeperClient;
import org.apache.fluss.utils.FileUtils;
import org.apache.fluss.utils.FlussPaths;
@@ -87,6 +88,7 @@ public final class LogManager extends TabletManagerBase {
private final ZooKeeperClient zkClient;
private final Scheduler scheduler;
private final Clock clock;
+ private final TabletServerMetricGroup serverMetricGroup;
private final ReentrantLock logCreationOrDeletionLock = new
ReentrantLock();
private final Map<TableBucket, LogTablet> currentLogs =
MapUtils.newConcurrentHashMap();
@@ -100,19 +102,25 @@ public final class LogManager extends TabletManagerBase {
ZooKeeperClient zkClient,
int recoveryThreadsPerDataDir,
Scheduler scheduler,
- Clock clock)
+ Clock clock,
+ TabletServerMetricGroup serverMetricGroup)
throws Exception {
super(TabletType.LOG, dataDir, conf, recoveryThreadsPerDataDir);
this.zkClient = zkClient;
this.scheduler = scheduler;
this.clock = clock;
+ this.serverMetricGroup = serverMetricGroup;
createAndValidateDataDir(dataDir);
initializeCheckpointMaps();
}
public static LogManager create(
- Configuration conf, ZooKeeperClient zkClient, Scheduler scheduler,
Clock clock)
+ Configuration conf,
+ ZooKeeperClient zkClient,
+ Scheduler scheduler,
+ Clock clock,
+ TabletServerMetricGroup serverMetricGroup)
throws Exception {
String dataDirString = conf.getString(ConfigOptions.DATA_DIR);
File dataDir = new File(dataDirString).getAbsoluteFile();
@@ -122,7 +130,8 @@ public final class LogManager extends TabletManagerBase {
zkClient,
conf.getInt(ConfigOptions.NETTY_SERVER_NUM_WORKER_THREADS),
scheduler,
- clock);
+ clock,
+ serverMetricGroup);
}
public void startup() {
@@ -246,6 +255,7 @@ public final class LogManager extends TabletManagerBase {
tablePath,
tabletDir,
conf,
+ serverMetricGroup,
0L,
scheduler,
logFormat,
@@ -348,6 +358,7 @@ public final class LogManager extends TabletManagerBase {
physicalTablePath,
tabletDir,
conf,
+ serverMetricGroup,
logRecoveryPoint,
scheduler,
tableInfo.getTableConfig().getLogFormat(),
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
index 689fc122b..8eb19362f 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
@@ -30,7 +30,6 @@ import org.apache.fluss.metadata.LogFormat;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TablePath;
-import org.apache.fluss.metrics.MeterView;
import org.apache.fluss.metrics.MetricNames;
import org.apache.fluss.metrics.groups.MetricGroup;
import org.apache.fluss.record.DefaultLogRecordBatch;
@@ -41,6 +40,7 @@ import org.apache.fluss.record.LogRecords;
import org.apache.fluss.record.MemoryLogRecords;
import org.apache.fluss.server.log.LocalLog.SegmentDeletionReason;
import org.apache.fluss.server.metrics.group.BucketMetricGroup;
+import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
import org.apache.fluss.utils.FlussPaths;
import org.apache.fluss.utils.clock.Clock;
import org.apache.fluss.utils.concurrent.Scheduler;
@@ -277,6 +277,7 @@ public final class LogTablet {
PhysicalTablePath tablePath,
File tabletDir,
Configuration conf,
+ TabletServerMetricGroup serverMetricGroup,
long recoveryPoint,
Scheduler scheduler,
LogFormat logFormat,
@@ -313,6 +314,7 @@ public final class LogTablet {
new LocalLog(
tabletDir,
conf,
+ serverMetricGroup,
segments,
recoveryPoint,
offsets.getNextOffsetMetadata(),
@@ -338,11 +340,6 @@ public final class LogTablet {
MetricNames.LOG_NUM_SEGMENTS, () ->
localLog.getSegments().numberOfSegments());
metricGroup.gauge(MetricNames.LOG_END_OFFSET,
localLog::getLocalLogEndOffset);
metricGroup.gauge(MetricNames.LOG_SIZE, () ->
localLog.getSegments().sizeInBytes());
-
- // about flush
- metricGroup.meter(MetricNames.LOG_FLUSH_RATE, new
MeterView(localLog.getFlushCount()));
- metricGroup.histogram(
- MetricNames.LOG_FLUSH_LATENCY_MS,
localLog.getFlushLatencyHistogram());
}
public void updateLeaderEndOffsetSnapshot() {
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java
index 4eace9cde..0e3666509 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java
@@ -28,7 +28,7 @@ import
org.apache.fluss.rpc.messages.CommitRemoteLogManifestRequest;
import org.apache.fluss.server.entity.CommitRemoteLogManifestData;
import org.apache.fluss.server.log.LogSegment;
import org.apache.fluss.server.log.LogTablet;
-import org.apache.fluss.server.metrics.group.PhysicalTableMetricGroup;
+import org.apache.fluss.server.metrics.group.TableMetricGroup;
import org.apache.fluss.server.replica.Replica;
import org.apache.fluss.utils.clock.Clock;
@@ -121,7 +121,7 @@ public class LogTieringTask implements Runnable {
try {
LogTablet logTablet = replica.getLogTablet();
- PhysicalTableMetricGroup metricGroup = replica.tableMetrics();
+ TableMetricGroup metricGroup = replica.tableMetrics();
maybeUpdateCopiedOffset(logTablet);
// Get these candidate log segments to copy and these expired
remote log segments to
@@ -234,7 +234,7 @@ public class LogTieringTask implements Runnable {
LogTablet log,
List<EnrichedLogSegment> segments,
List<RemoteLogSegment> copiedSegments,
- PhysicalTableMetricGroup metricGroup)
+ TableMetricGroup metricGroup)
throws Exception {
long endOffset = -1;
for (EnrichedLogSegment enrichedSegment : segments) {
@@ -449,7 +449,7 @@ public class LogTieringTask implements Runnable {
/** Delete the remote log segment files. */
private void deleteRemoteLogSegmentFiles(
- List<RemoteLogSegment> remoteLogSegmentList,
PhysicalTableMetricGroup metricGroup) {
+ List<RemoteLogSegment> remoteLogSegmentList, TableMetricGroup
metricGroup) {
for (RemoteLogSegment remoteLogSegment : remoteLogSegmentList) {
try {
remoteLogStorage.deleteLogSegmentFiles(remoteLogSegment);
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/BucketMetricGroup.java
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/BucketMetricGroup.java
index 9770a603d..fc8e281a5 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/BucketMetricGroup.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/BucketMetricGroup.java
@@ -21,22 +21,36 @@ import org.apache.fluss.metrics.CharacterFilter;
import org.apache.fluss.metrics.groups.AbstractMetricGroup;
import org.apache.fluss.metrics.registry.MetricRegistry;
+import javax.annotation.Nullable;
+
import java.util.Map;
import static org.apache.fluss.metrics.utils.MetricGroupUtils.makeScope;
/** Metrics for the table buckets with table as parent group. */
public class BucketMetricGroup extends AbstractMetricGroup {
-
+ // will be null if the bucket doesn't belong to a partition
+ private final @Nullable String partitionName;
private final int bucket;
- public BucketMetricGroup(MetricRegistry registry, int bucket,
PhysicalTableMetricGroup parent) {
+ public BucketMetricGroup(
+ MetricRegistry registry,
+ @Nullable String partitionName,
+ int bucket,
+ TableMetricGroup parent) {
super(registry, makeScope(parent, String.valueOf(bucket)), parent);
+ this.partitionName = partitionName;
this.bucket = bucket;
}
@Override
protected void putVariables(Map<String, String> variables) {
+ if (partitionName != null) {
+ variables.put("partition", partitionName);
+ } else {
+ // value of empty string indicates non-partitioned tables
+ variables.put("partition", "");
+ }
variables.put("bucket", String.valueOf(bucket));
}
@@ -45,7 +59,7 @@ public class BucketMetricGroup extends AbstractMetricGroup {
return "bucket";
}
- public PhysicalTableMetricGroup getPhysicalTableMetricGroup() {
- return (PhysicalTableMetricGroup) parent;
+ public TableMetricGroup getTableMetricGroup() {
+ return (TableMetricGroup) parent;
}
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/PhysicalTableMetricGroup.java
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java
similarity index 81%
rename from
fluss-server/src/main/java/org/apache/fluss/server/metrics/group/PhysicalTableMetricGroup.java
rename to
fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java
index 9a5370fac..7620bcbfd 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/PhysicalTableMetricGroup.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java
@@ -17,7 +17,8 @@
package org.apache.fluss.server.metrics.group;
-import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.metrics.CharacterFilter;
import org.apache.fluss.metrics.Counter;
import org.apache.fluss.metrics.MeterView;
@@ -35,34 +36,35 @@ import java.util.Map;
import static org.apache.fluss.metrics.utils.MetricGroupUtils.makeScope;
/**
- * Metrics for the physical tables(tables or partitions) in server with {@link
- * TabletServerMetricGroup} as parent group.
+ * Metrics for the tables(tables or partitions) in server with {@link
TabletServerMetricGroup} as
+ * parent group.
*/
-public class PhysicalTableMetricGroup extends AbstractMetricGroup {
+public class TableMetricGroup extends AbstractMetricGroup {
- private final Map<Integer, BucketMetricGroup> buckets = new HashMap<>();
+ private final Map<TableBucket, BucketMetricGroup> buckets = new
HashMap<>();
- private final PhysicalTablePath physicalTablePath;
+ private final TablePath tablePath;
- // ---- metrics for log, when the table is for kv, it's for cdc log
+ // server-level metrics
+ private final TabletServerMetricGroup serverMetrics;
+
+ // table-level metrics for log, when the table is for kv, it's for cdc log
private final LogMetricGroup logMetrics;
- // ---- metrics for kv, will be null if the table isn't a kv table ----
+ // table-level metrics for kv, will be null if the table isn't a kv table
private final @Nullable KvMetricGroup kvMetrics;
- public PhysicalTableMetricGroup(
+ public TableMetricGroup(
MetricRegistry registry,
- PhysicalTablePath physicalTablePath,
+ TablePath tablePath,
boolean isKvTable,
TabletServerMetricGroup serverMetricGroup) {
super(
registry,
- makeScope(
- serverMetricGroup,
- physicalTablePath.getDatabaseName(),
- physicalTablePath.getTableName()),
+ makeScope(serverMetricGroup, tablePath.getDatabaseName(),
tablePath.getTableName()),
serverMetricGroup);
- this.physicalTablePath = physicalTablePath;
+ this.serverMetrics = serverMetricGroup;
+ this.tablePath = tablePath;
// if is kv table, create kv metrics
if (isKvTable) {
@@ -77,15 +79,8 @@ public class PhysicalTableMetricGroup extends
AbstractMetricGroup {
@Override
protected void putVariables(Map<String, String> variables) {
- variables.put("database", physicalTablePath.getDatabaseName());
- variables.put("table", physicalTablePath.getTableName());
-
- if (physicalTablePath.getPartitionName() != null) {
- variables.put("partition", physicalTablePath.getPartitionName());
- } else {
- // value of empty string indicates non-partitioned tables
- variables.put("partition", "");
- }
+ variables.put("database", tablePath.getDatabaseName());
+ variables.put("table", tablePath.getTableName());
}
@Override
@@ -94,16 +89,19 @@ public class PhysicalTableMetricGroup extends
AbstractMetricGroup {
return "table";
}
- public Counter logMessageIn() {
- return logMetrics.messagesIn;
+ public void incLogMessageIn(long n) {
+ logMetrics.messagesIn.inc(n);
+ serverMetrics.messageIn().inc(n);
}
- public Counter logBytesIn() {
- return logMetrics.bytesIn;
+ public void incLogBytesIn(long n) {
+ logMetrics.bytesIn.inc(n);
+ serverMetrics.bytesIn().inc(n);
}
- public Counter logBytesOut() {
- return logMetrics.bytesOut;
+ public void incLogBytesOut(long n) {
+ logMetrics.bytesOut.inc(n);
+ serverMetrics.bytesOut().inc(n);
}
public Counter totalFetchLogRequests() {
@@ -142,19 +140,21 @@ public class PhysicalTableMetricGroup extends
AbstractMetricGroup {
return logMetrics.remoteLogDeleteErrors;
}
- public Counter kvMessageIn() {
+ public void incKvMessageIn(long n) {
if (kvMetrics == null) {
- return NoOpCounter.INSTANCE;
+ NoOpCounter.INSTANCE.inc(n);
} else {
- return kvMetrics.messagesIn;
+ kvMetrics.messagesIn.inc(n);
+ serverMetrics.messageIn().inc(n);
}
}
- public Counter kvBytesIn() {
+ public void incKvBytesIn(long n) {
if (kvMetrics == null) {
- return NoOpCounter.INSTANCE;
+ NoOpCounter.INSTANCE.inc(n);
} else {
- return kvMetrics.bytesIn;
+ kvMetrics.bytesIn.inc(n);
+ serverMetrics.bytesIn().inc(n);
}
}
@@ -225,13 +225,17 @@ public class PhysicalTableMetricGroup extends
AbstractMetricGroup {
// ------------------------------------------------------------------------
// bucket groups
// ------------------------------------------------------------------------
- public BucketMetricGroup addBucketMetricGroup(int bucketId) {
+ public BucketMetricGroup addBucketMetricGroup(
+ @Nullable String partitionName, TableBucket tableBucket) {
return buckets.computeIfAbsent(
- bucketId, (bucket) -> new BucketMetricGroup(registry,
bucketId, this));
+ tableBucket,
+ (bucket) ->
+ new BucketMetricGroup(
+ registry, partitionName,
tableBucket.getBucket(), this));
}
- public void removeBucketMetricGroup(int bucketId) {
- BucketMetricGroup metricGroup = buckets.remove(bucketId);
+ public void removeBucketMetricGroup(TableBucket tableBucket) {
+ BucketMetricGroup metricGroup = buckets.remove(tableBucket);
metricGroup.close();
}
@@ -239,6 +243,10 @@ public class PhysicalTableMetricGroup extends
AbstractMetricGroup {
return buckets.size();
}
+ public TabletServerMetricGroup getServerMetricGroup() {
+ return (TabletServerMetricGroup) parent;
+ }
+
/** Metric group for specific kind of tablet of a table. */
private static class TabletMetricGroup extends AbstractMetricGroup {
private final TabletType tabletType;
@@ -248,12 +256,11 @@ public class PhysicalTableMetricGroup extends
AbstractMetricGroup {
protected final Counter bytesIn;
protected final Counter bytesOut;
- private TabletMetricGroup(
- PhysicalTableMetricGroup physicalTableMetricGroup, TabletType
tabletType) {
+ private TabletMetricGroup(TableMetricGroup tableMetricGroup,
TabletType tabletType) {
super(
- physicalTableMetricGroup.registry,
- makeScope(physicalTableMetricGroup, tabletType.name),
- physicalTableMetricGroup);
+ tableMetricGroup.registry,
+ makeScope(tableMetricGroup, tabletType.name),
+ tableMetricGroup);
this.tabletType = tabletType;
messagesIn = new ThreadSafeSimpleCounter();
@@ -293,9 +300,8 @@ public class PhysicalTableMetricGroup extends
AbstractMetricGroup {
private final Counter remoteLogDeleteRequests;
private final Counter remoteLogDeleteErrors;
- private LogMetricGroup(
- PhysicalTableMetricGroup physicalTableMetricGroup, TabletType
groupType) {
- super(physicalTableMetricGroup, groupType);
+ private LogMetricGroup(TableMetricGroup tableMetricGroup, TabletType
groupType) {
+ super(tableMetricGroup, groupType);
// for fetch log requests
totalFetchLogRequests = new ThreadSafeSimpleCounter();
meter(MetricNames.TOTAL_FETCH_LOG_REQUESTS_RATE, new
MeterView(totalFetchLogRequests));
@@ -350,8 +356,8 @@ public class PhysicalTableMetricGroup extends
AbstractMetricGroup {
private final Counter totalPrefixLookupRequests;
private final Counter failedPrefixLookupRequests;
- public KvMetricGroup(PhysicalTableMetricGroup
physicalTableMetricGroup) {
- super(physicalTableMetricGroup, TabletType.KV);
+ public KvMetricGroup(TableMetricGroup tableMetricGroup) {
+ super(tableMetricGroup, TabletType.KV);
// for lookup request
totalLookupRequests = new ThreadSafeSimpleCounter();
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java
index ce20e533f..cb1b75ff3 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java
@@ -18,12 +18,18 @@
package org.apache.fluss.server.metrics.group;
import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.metrics.CharacterFilter;
import org.apache.fluss.metrics.Counter;
+import org.apache.fluss.metrics.DescriptiveStatisticsHistogram;
+import org.apache.fluss.metrics.Histogram;
import org.apache.fluss.metrics.MeterView;
import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.metrics.SimpleCounter;
import org.apache.fluss.metrics.ThreadSafeSimpleCounter;
import org.apache.fluss.metrics.groups.AbstractMetricGroup;
+import org.apache.fluss.metrics.groups.MetricGroup;
import org.apache.fluss.metrics.registry.MetricRegistry;
import org.apache.fluss.utils.MapUtils;
@@ -33,8 +39,9 @@ import java.util.Map;
public class TabletServerMetricGroup extends AbstractMetricGroup {
private static final String NAME = "tabletserver";
+ private static final int WINDOW_SIZE = 1024;
- private final Map<PhysicalTablePath, PhysicalTableMetricGroup>
metricGroupByPhysicalTable =
+ private final Map<TablePath, TableMetricGroup> metricGroupByTable =
MapUtils.newConcurrentHashMap();
protected final String clusterId;
@@ -49,6 +56,26 @@ public class TabletServerMetricGroup extends
AbstractMetricGroup {
private final Counter delayedFetchFromFollowerExpireCount;
private final Counter delayedFetchFromClientExpireCount;
+ // aggregated metrics
+ private final Counter messagesIn;
+ private final Counter bytesIn;
+ private final Counter bytesOut;
+
+ // aggregated log metrics
+ private final Counter logFlushCount;
+ private final Histogram logFlushLatencyHistogram;
+
+ // aggregated kv metrics
+ private final Counter kvFlushCount;
+ private final Histogram kvFlushLatencyHistogram;
+ private final Counter kvTruncateAsDuplicatedCount;
+ private final Counter kvTruncateAsErrorCount;
+
+ // aggregated replica metrics
+ private final Counter isrShrinks;
+ private final Counter isrExpands;
+ private final Counter failedIsrUpdates;
+
public TabletServerMetricGroup(
MetricRegistry registry, String clusterId, String rack, String
hostname, int serverId) {
super(registry, new String[] {clusterId, hostname, NAME}, null);
@@ -72,6 +99,42 @@ public class TabletServerMetricGroup extends
AbstractMetricGroup {
meter(
MetricNames.DELAYED_FETCH_FROM_CLIENT_EXPIRES_RATE,
new MeterView(delayedFetchFromClientExpireCount));
+
+ messagesIn = new ThreadSafeSimpleCounter();
+ meter(MetricNames.MESSAGES_IN_RATE, new MeterView(messagesIn));
+ bytesIn = new ThreadSafeSimpleCounter();
+ meter(MetricNames.BYTES_IN_RATE, new MeterView(bytesIn));
+ bytesOut = new ThreadSafeSimpleCounter();
+ meter(MetricNames.BYTES_OUT_RATE, new MeterView(bytesOut));
+
+ MetricGroup logMetricGroup = this.addGroup("log");
+ // about flush
+ logFlushCount = new SimpleCounter();
+ logMetricGroup.meter(MetricNames.LOG_FLUSH_RATE, new
MeterView(logFlushCount));
+ logFlushLatencyHistogram = new
DescriptiveStatisticsHistogram(WINDOW_SIZE);
+ logMetricGroup.histogram(MetricNames.LOG_FLUSH_LATENCY_MS,
logFlushLatencyHistogram);
+
+ // about pre-write buffer.
+ kvFlushCount = new SimpleCounter();
+ meter(MetricNames.KV_FLUSH_RATE, new MeterView(kvFlushCount));
+ kvFlushLatencyHistogram = new
DescriptiveStatisticsHistogram(WINDOW_SIZE);
+ histogram(MetricNames.KV_FLUSH_LATENCY_MS, kvFlushLatencyHistogram);
+ kvTruncateAsDuplicatedCount = new SimpleCounter();
+ meter(
+ MetricNames.KV_PRE_WRITE_BUFFER_TRUNCATE_AS_DUPLICATED_RATE,
+ new MeterView(kvTruncateAsDuplicatedCount));
+ kvTruncateAsErrorCount = new SimpleCounter();
+ meter(
+ MetricNames.KV_PRE_WRITE_BUFFER_TRUNCATE_AS_ERROR_RATE,
+ new MeterView(kvTruncateAsErrorCount));
+
+ // replica metrics
+ isrExpands = new SimpleCounter();
+ meter(MetricNames.ISR_EXPANDS_RATE, new MeterView(isrExpands));
+ isrShrinks = new SimpleCounter();
+ meter(MetricNames.ISR_SHRINKS_RATE, new MeterView(isrShrinks));
+ failedIsrUpdates = new SimpleCounter();
+ meter(MetricNames.FAILED_ISR_UPDATES_RATE, new
MeterView(failedIsrUpdates));
}
@Override
@@ -112,33 +175,79 @@ public class TabletServerMetricGroup extends
AbstractMetricGroup {
return delayedFetchFromClientExpireCount;
}
+ public Counter messageIn() {
+ return messagesIn;
+ }
+
+ public Counter bytesIn() {
+ return bytesIn;
+ }
+
+ public Counter bytesOut() {
+ return bytesOut;
+ }
+
+ public Counter logFlushCount() {
+ return logFlushCount;
+ }
+
+ public Histogram logFlushLatencyHistogram() {
+ return logFlushLatencyHistogram;
+ }
+
+ public Counter kvFlushCount() {
+ return kvFlushCount;
+ }
+
+ public Histogram kvFlushLatencyHistogram() {
+ return kvFlushLatencyHistogram;
+ }
+
+ public Counter kvTruncateAsDuplicatedCount() {
+ return kvTruncateAsDuplicatedCount;
+ }
+
+ public Counter kvTruncateAsErrorCount() {
+ return kvTruncateAsErrorCount;
+ }
+
+ public Counter isrShrinks() {
+ return isrShrinks;
+ }
+
+ public Counter isrExpands() {
+ return isrExpands;
+ }
+
+ public Counter failedIsrUpdates() {
+ return failedIsrUpdates;
+ }
+
// ------------------------------------------------------------------------
// table buckets groups
// ------------------------------------------------------------------------
- public BucketMetricGroup addPhysicalTableBucketMetricGroup(
- PhysicalTablePath physicalTablePath, int bucket, boolean
isKvTable) {
- PhysicalTableMetricGroup physicalTableMetricGroup =
- metricGroupByPhysicalTable.computeIfAbsent(
- physicalTablePath,
- table ->
- new PhysicalTableMetricGroup(
- registry, physicalTablePath,
isKvTable, this));
- return physicalTableMetricGroup.addBucketMetricGroup(bucket);
- }
-
- public void removeTableBucketMetricGroup(PhysicalTablePath
physicalTablePath, int bucket) {
- // get the metric group of the physical table
- PhysicalTableMetricGroup physicalTableMetricGroup =
- metricGroupByPhysicalTable.get(physicalTablePath);
- // if get the physical table metric group
- if (physicalTableMetricGroup != null) {
+ public BucketMetricGroup addTableBucketMetricGroup(
+ PhysicalTablePath physicalTablePath, TableBucket bucket, boolean
isKvTable) {
+ TablePath tablePath = physicalTablePath.getTablePath();
+ TableMetricGroup tableMetricGroup =
+ metricGroupByTable.computeIfAbsent(
+ tablePath,
+ table -> new TableMetricGroup(registry, tablePath,
isKvTable, this));
+ return
tableMetricGroup.addBucketMetricGroup(physicalTablePath.getPartitionName(),
bucket);
+ }
+
+ public void removeTableBucketMetricGroup(TablePath tablePath, TableBucket
bucket) {
+ // get the metric group of the table
+ TableMetricGroup tableMetricGroup = metricGroupByTable.get(tablePath);
+ // if get the table metric group
+ if (tableMetricGroup != null) {
// remove the bucket metric group
- physicalTableMetricGroup.removeBucketMetricGroup(bucket);
+ tableMetricGroup.removeBucketMetricGroup(bucket);
// if no any bucket groups remain in the physical table metrics
group,
// close and remove the physical table metric group
- if (physicalTableMetricGroup.bucketGroupsCount() == 0) {
- physicalTableMetricGroup.close();
- metricGroupByPhysicalTable.remove(physicalTablePath);
+ if (tableMetricGroup.bucketGroupsCount() == 0) {
+ tableMetricGroup.close();
+ metricGroupByTable.remove(tablePath);
}
}
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
index e86a29b1d..feb657201 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
@@ -38,9 +38,6 @@ import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.metrics.Counter;
-import org.apache.fluss.metrics.MeterView;
-import org.apache.fluss.metrics.MetricNames;
-import org.apache.fluss.metrics.SimpleCounter;
import org.apache.fluss.record.DefaultValueRecordBatch;
import org.apache.fluss.record.KvRecordBatch;
import org.apache.fluss.record.LogRecords;
@@ -76,7 +73,8 @@ import
org.apache.fluss.server.log.checkpoint.OffsetCheckpointFile;
import org.apache.fluss.server.log.remote.RemoteLogManager;
import org.apache.fluss.server.metadata.ServerMetadataCache;
import org.apache.fluss.server.metrics.group.BucketMetricGroup;
-import org.apache.fluss.server.metrics.group.PhysicalTableMetricGroup;
+import org.apache.fluss.server.metrics.group.TableMetricGroup;
+import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
import org.apache.fluss.server.replica.delay.DelayedFetchLog;
import org.apache.fluss.server.replica.delay.DelayedOperationManager;
import org.apache.fluss.server.replica.delay.DelayedTableBucketKey;
@@ -246,19 +244,12 @@ public final class Replica {
}
private void registerMetrics() {
- bucketMetricGroup.gauge(MetricNames.UNDER_REPLICATED, () ->
isUnderReplicated() ? 1 : 0);
- bucketMetricGroup.gauge(
- MetricNames.IN_SYNC_REPLICAS, () -> isLeader() ?
isrState.isr().size() : 0);
- bucketMetricGroup.gauge(MetricNames.UNDER_MIN_ISR, () ->
isUnderMinIsr() ? 1 : 0);
- bucketMetricGroup.gauge(MetricNames.AT_MIN_ISR, () -> isAtMinIsr() ? 1
: 0);
-
- isrExpands = new SimpleCounter();
- bucketMetricGroup.meter(MetricNames.ISR_EXPANDS_RATE, new
MeterView(isrExpands));
- isrShrinks = new SimpleCounter();
- bucketMetricGroup.meter(MetricNames.ISR_SHRINKS_RATE, new
MeterView(isrShrinks));
- failedIsrUpdates = new SimpleCounter();
- bucketMetricGroup.meter(
- MetricNames.FAILED_ISR_UPDATES_RATE, new
MeterView(failedIsrUpdates));
+ // get root metric in the reference: bucket -> table -> tabletServer
+ TabletServerMetricGroup serverMetrics =
+ bucketMetricGroup.getTableMetricGroup().getServerMetricGroup();
+ isrExpands = serverMetrics.isrExpands();
+ isrShrinks = serverMetrics.isrShrinks();
+ failedIsrUpdates = serverMetrics.failedIsrUpdates();
}
public boolean isKvTable() {
@@ -349,8 +340,8 @@ public final class Replica {
return bucketMetricGroup;
}
- public PhysicalTableMetricGroup tableMetrics() {
- return bucketMetricGroup.getPhysicalTableMetricGroup();
+ public TableMetricGroup tableMetrics() {
+ return bucketMetricGroup.getTableMetricGroup();
}
public void makeLeader(NotifyLeaderAndIsrData data) throws IOException {
@@ -637,8 +628,6 @@ public final class Replica {
arrowCompressionInfo);
}
- kvTablet.registerMetrics(bucketMetricGroup);
-
logTablet.updateMinRetainOffset(restoreStartOffset);
recoverKvTablet(restoreStartOffset);
} catch (Exception e) {
@@ -1749,12 +1738,12 @@ public final class Replica {
}
}
- private boolean isUnderReplicated() {
+ public boolean isUnderReplicated() {
// is leader and isr size less than numReplicas
return isLeader() && isrState.isr().size() <
tableConfig.getReplicationFactor();
}
- private boolean isUnderMinIsr() {
+ public boolean isUnderMinIsr() {
return isLeader() && isrState.isr().size() < minInSyncReplicas;
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
index 26b31bdd5..692bd4c2a 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
@@ -81,7 +81,7 @@ import org.apache.fluss.server.log.remote.RemoteLogManager;
import org.apache.fluss.server.metadata.ClusterMetadata;
import org.apache.fluss.server.metadata.TabletServerMetadataCache;
import org.apache.fluss.server.metrics.group.BucketMetricGroup;
-import org.apache.fluss.server.metrics.group.PhysicalTableMetricGroup;
+import org.apache.fluss.server.metrics.group.TableMetricGroup;
import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
import org.apache.fluss.server.replica.delay.DelayedFetchLog;
import org.apache.fluss.server.replica.delay.DelayedFetchLog.FetchBucketStatus;
@@ -302,6 +302,10 @@ public class ReplicaManager {
serverMetricGroup.gauge(MetricNames.DELAYED_WRITE_COUNT,
delayedWriteManager::numDelayed);
serverMetricGroup.gauge(
MetricNames.DELAYED_FETCH_COUNT,
delayedFetchLogManager::numDelayed);
+
+ serverMetricGroup.gauge(MetricNames.UNDER_REPLICATED,
this::underReplicatedCount);
+ serverMetricGroup.gauge(MetricNames.UNDER_MIN_ISR,
this::underMinIsrCount);
+ serverMetricGroup.gauge(MetricNames.AT_MIN_ISR, this::atMinIsrCount);
}
private Stream<Replica> onlineReplicas() {
@@ -318,6 +322,18 @@ public class ReplicaManager {
.map(t -> (Replica) t.get());
}
+ private long underReplicatedCount() {
+ return onlineReplicas().filter(Replica::isUnderReplicated).count();
+ }
+
+ private long underMinIsrCount() {
+ return onlineReplicas().filter(Replica::isUnderMinIsr).count();
+ }
+
+ private long atMinIsrCount() {
+ return onlineReplicas().filter(Replica::isAtMinIsr).count();
+ }
+
private int writerIdCount() {
return onlineReplicas().map(Replica::writerIdCount).reduce(0,
Integer::sum);
}
@@ -478,7 +494,7 @@ public class ReplicaManager {
Consumer<Map<TableBucket, LookupResultForBucket>>
responseCallback) {
Map<TableBucket, LookupResultForBucket> lookupResultForBucketMap = new
HashMap<>();
long startTime = System.currentTimeMillis();
- PhysicalTableMetricGroup tableMetrics = null;
+ TableMetricGroup tableMetrics = null;
for (Map.Entry<TableBucket, List<byte[]>> entry :
entriesPerBucket.entrySet()) {
TableBucket tb = entry.getKey();
try {
@@ -509,7 +525,7 @@ public class ReplicaManager {
public void prefixLookups(
Map<TableBucket, List<byte[]>> entriesPerBucket,
Consumer<Map<TableBucket, PrefixLookupResultForBucket>>
responseCallback) {
- PhysicalTableMetricGroup tableMetrics = null;
+ TableMetricGroup tableMetrics = null;
Map<TableBucket, PrefixLookupResultForBucket> result = new HashMap<>();
for (Map.Entry<TableBucket, List<byte[]>> entry :
entriesPerBucket.entrySet()) {
TableBucket tb = entry.getKey();
@@ -862,7 +878,7 @@ public class ReplicaManager {
Map<TableBucket, ProduceLogResultForBucket> resultForBucketMap = new
HashMap<>();
for (Map.Entry<TableBucket, MemoryLogRecords> entry :
entriesPerBucket.entrySet()) {
TableBucket tb = entry.getKey();
- PhysicalTableMetricGroup tableMetrics = null;
+ TableMetricGroup tableMetrics = null;
try {
Replica replica = getReplicaOrException(tb);
tableMetrics = replica.tableMetrics();
@@ -881,8 +897,8 @@ public class ReplicaManager {
resultForBucketMap.put(
tb,
new ProduceLogResultForBucket(tb, baseOffset,
appendInfo.lastOffset() + 1));
- tableMetrics.logBytesIn().inc(appendInfo.validBytes());
- tableMetrics.logMessageIn().inc(appendInfo.numMessages());
+ tableMetrics.incLogBytesIn(appendInfo.validBytes());
+ tableMetrics.incLogMessageIn(appendInfo.numMessages());
} catch (Exception e) {
if (isUnexpectedException(e)) {
LOG.error("Error append records to local log on replica
{}", tb, e);
@@ -908,7 +924,7 @@ public class ReplicaManager {
Map<TableBucket, PutKvResultForBucket> putResultForBucketMap = new
HashMap<>();
for (Map.Entry<TableBucket, KvRecordBatch> entry :
entriesPerBucket.entrySet()) {
TableBucket tb = entry.getKey();
- PhysicalTableMetricGroup tableMetrics = null;
+ TableMetricGroup tableMetrics = null;
try {
LOG.trace("Put records to local kv tablet for table bucket
{}", tb);
Replica replica = getReplicaOrException(tb);
@@ -925,11 +941,11 @@ public class ReplicaManager {
tb, new PutKvResultForBucket(tb,
appendInfo.lastOffset() + 1));
// metric for kv
-
tableMetrics.kvMessageIn().inc(entry.getValue().getRecordCount());
- tableMetrics.kvBytesIn().inc(entry.getValue().sizeInBytes());
+ tableMetrics.incKvMessageIn(entry.getValue().getRecordCount());
+ tableMetrics.incKvBytesIn(entry.getValue().sizeInBytes());
// metric for cdc log of kv
- tableMetrics.logBytesIn().inc(appendInfo.validBytes());
- tableMetrics.logMessageIn().inc(appendInfo.numMessages());
+ tableMetrics.incLogBytesIn(appendInfo.validBytes());
+ tableMetrics.incLogMessageIn(appendInfo.numMessages());
} catch (Exception e) {
if (isUnexpectedException(e)) {
LOG.error("Error put records to local kv on replica {}",
tb, e);
@@ -953,7 +969,7 @@ public class ReplicaManager {
int limit,
Consumer<LimitScanResultForBucket> responseCallback) {
LimitScanResultForBucket limitScanResultForBucket;
- PhysicalTableMetricGroup tableMetrics = null;
+ TableMetricGroup tableMetrics = null;
try {
Replica replica = getReplicaOrException(tableBucket);
tableMetrics = replica.tableMetrics();
@@ -988,7 +1004,7 @@ public class ReplicaManager {
int limitBytes = fetchParams.maxFetchBytes();
for (Map.Entry<TableBucket, FetchReqInfo> entry :
bucketFetchInfo.entrySet()) {
TableBucket tb = entry.getKey();
- PhysicalTableMetricGroup tableMetrics = null;
+ TableMetricGroup tableMetrics = null;
Replica replica = null;
FetchReqInfo fetchReqInfo = entry.getValue();
long fetchOffset = fetchReqInfo.getFetchOffset();
@@ -1031,7 +1047,7 @@ public class ReplicaManager {
if (isFromFollower) {
serverMetricGroup.replicationBytesOut().inc(recordBatchSize);
} else {
- tableMetrics.logBytesOut().inc(recordBatchSize);
+ tableMetrics.incLogBytesOut(recordBatchSize);
}
} catch (Exception e) {
if (isUnexpectedException(e)) {
@@ -1382,7 +1398,7 @@ public class ReplicaManager {
if (delete) {
if (allReplicas.remove(tb) != null) {
serverMetricGroup.removeTableBucketMetricGroup(
- replicaToDelete.getPhysicalTablePath(),
tb.getBucket());
+
replicaToDelete.getPhysicalTablePath().getTablePath(), tb);
replicaToDelete.delete();
Path tabletParentDir =
replicaToDelete.getTabletParentDir();
if (tb.getPartitionId() != null) {
@@ -1459,8 +1475,8 @@ public class ReplicaManager {
TableInfo tableInfo = getTableInfo(zkClient, tablePath);
boolean isKvTable = tableInfo.hasPrimaryKey();
BucketMetricGroup bucketMetricGroup =
- serverMetricGroup.addPhysicalTableBucketMetricGroup(
- physicalTablePath, tb.getBucket(), isKvTable);
+ serverMetricGroup.addTableBucketMetricGroup(
+ physicalTablePath, tb, isKvTable);
Replica replica =
new Replica(
physicalTablePath,
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
index 9f8db8000..0e593c2c8 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
@@ -189,10 +189,11 @@ public class TabletServer extends ServerBase {
this.scheduler = new FlussScheduler(conf.get(BACKGROUND_THREADS));
scheduler.startup();
- this.logManager = LogManager.create(conf, zkClient, scheduler,
clock);
+ this.logManager =
+ LogManager.create(conf, zkClient, scheduler, clock,
tabletServerMetricGroup);
logManager.startup();
- this.kvManager = KvManager.create(conf, zkClient, logManager);
+ this.kvManager = KvManager.create(conf, zkClient, logManager,
tabletServerMetricGroup);
kvManager.startup();
this.authorizer = AuthorizerLoader.createAuthorizer(conf,
zkClient, pluginManager);
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvManagerTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvManagerTest.java
index 714adf545..7627e0ba3 100644
--- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvManagerTest.java
+++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvManagerTest.java
@@ -32,6 +32,7 @@ import org.apache.fluss.record.TestData;
import org.apache.fluss.row.encode.ValueEncoder;
import org.apache.fluss.server.log.LogManager;
import org.apache.fluss.server.log.LogTablet;
+import org.apache.fluss.server.metrics.group.TestingMetricGroups;
import org.apache.fluss.server.zk.NOPErrorHandler;
import org.apache.fluss.server.zk.ZooKeeperClient;
import org.apache.fluss.server.zk.ZooKeeperExtension;
@@ -110,8 +111,15 @@ final class KvManagerTest {
// we need a log manager for kv manager
logManager =
- LogManager.create(conf, zkClient, new FlussScheduler(1),
SystemClock.getInstance());
- kvManager = KvManager.create(conf, zkClient, logManager);
+ LogManager.create(
+ conf,
+ zkClient,
+ new FlussScheduler(1),
+ SystemClock.getInstance(),
+ TestingMetricGroups.TABLET_SERVER_METRICS);
+ kvManager =
+ KvManager.create(
+ conf, zkClient, logManager,
TestingMetricGroups.TABLET_SERVER_METRICS);
kvManager.startup();
}
@@ -171,7 +179,9 @@ final class KvManagerTest {
// restart
kvManager.shutdown();
- kvManager = KvManager.create(conf, zkClient, logManager);
+ kvManager =
+ KvManager.create(
+ conf, zkClient, logManager,
TestingMetricGroups.TABLET_SERVER_METRICS);
kvManager.startup();
kv1 = getOrCreateKv(tablePath1, partitionName, tableBucket1);
kv2 = getOrCreateKv(tablePath2, partitionName, tableBucket2);
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
index 6734a32d5..b9d9df99b 100644
--- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
+++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
@@ -48,6 +48,7 @@ import org.apache.fluss.server.log.FetchIsolation;
import org.apache.fluss.server.log.LogAppendInfo;
import org.apache.fluss.server.log.LogTablet;
import org.apache.fluss.server.log.LogTestUtils;
+import org.apache.fluss.server.metrics.group.TestingMetricGroups;
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator;
import org.apache.fluss.types.DataTypes;
import org.apache.fluss.types.RowType;
@@ -145,6 +146,7 @@ class KvTabletTest {
tablePath,
logTabletDir,
conf,
+ TestingMetricGroups.TABLET_SERVER_METRICS,
0,
new FlussScheduler(1),
LogFormat.ARROW,
@@ -173,6 +175,7 @@ class KvTabletTest {
logTablet,
tmpKvDir,
conf,
+ TestingMetricGroups.TABLET_SERVER_METRICS,
new RootAllocator(Long.MAX_VALUE),
new TestingMemorySegmentPool(10 * 1024),
KvFormat.COMPACTED,
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBufferTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBufferTest.java
index 9c6418a4d..b39ec39ac 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBufferTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBufferTest.java
@@ -19,6 +19,7 @@ package org.apache.fluss.server.kv.prewrite;
import org.apache.fluss.server.kv.KvBatchWriter;
import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.TruncateReason;
+import org.apache.fluss.server.metrics.group.TestingMetricGroups;
import org.junit.jupiter.api.Test;
@@ -32,7 +33,9 @@ class KvPreWriteBufferTest {
@Test
void testIllegalLSN() {
- KvPreWriteBuffer buffer = new KvPreWriteBuffer(new NopKvBatchWriter());
+ KvPreWriteBuffer buffer =
+ new KvPreWriteBuffer(
+ new NopKvBatchWriter(),
TestingMetricGroups.TABLET_SERVER_METRICS);
bufferPut(buffer, "key1", "value1", 1);
bufferDelete(buffer, "key1", 3);
@@ -51,7 +54,9 @@ class KvPreWriteBufferTest {
@Test
void testWriteAndFlush() throws Exception {
- KvPreWriteBuffer buffer = new KvPreWriteBuffer(new NopKvBatchWriter());
+ KvPreWriteBuffer buffer =
+ new KvPreWriteBuffer(
+ new NopKvBatchWriter(),
TestingMetricGroups.TABLET_SERVER_METRICS);
int elementCount = 0;
// put a series of kv entries
@@ -132,7 +137,9 @@ class KvPreWriteBufferTest {
@Test
void testTruncate() {
- KvPreWriteBuffer buffer = new KvPreWriteBuffer(new NopKvBatchWriter());
+ KvPreWriteBuffer buffer =
+ new KvPreWriteBuffer(
+ new NopKvBatchWriter(),
TestingMetricGroups.TABLET_SERVER_METRICS);
int elementCount = 0;
// put a series of kv entries
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/log/LocalLogTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/log/LocalLogTest.java
index 37bf57b4f..ad3d0c6ca 100644
--- a/fluss-server/src/test/java/org/apache/fluss/server/log/LocalLogTest.java
+++ b/fluss-server/src/test/java/org/apache/fluss/server/log/LocalLogTest.java
@@ -24,6 +24,7 @@ import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.record.LogTestBase;
import org.apache.fluss.record.MemoryLogRecords;
import org.apache.fluss.server.log.LocalLog.SegmentDeletionReason;
+import org.apache.fluss.server.metrics.group.TestingMetricGroups;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -408,6 +409,7 @@ final class LocalLogTest extends LogTestBase {
return new LocalLog(
dir,
logConf,
+ TestingMetricGroups.TABLET_SERVER_METRICS,
segments,
recoverPoint,
nextOffsetMetadata,
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/log/LogManagerTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/log/LogManagerTest.java
index 5ae6b0f22..2bbdd2a1c 100644
--- a/fluss-server/src/test/java/org/apache/fluss/server/log/LogManagerTest.java
+++ b/fluss-server/src/test/java/org/apache/fluss/server/log/LogManagerTest.java
@@ -25,6 +25,7 @@ import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.record.LogTestBase;
import org.apache.fluss.record.MemoryLogRecords;
import org.apache.fluss.server.log.checkpoint.OffsetCheckpointFile;
+import org.apache.fluss.server.metrics.group.TestingMetricGroups;
import org.apache.fluss.server.zk.NOPErrorHandler;
import org.apache.fluss.server.zk.ZooKeeperClient;
import org.apache.fluss.server.zk.ZooKeeperExtension;
@@ -100,7 +101,12 @@ final class LogManagerTest extends LogTestBase {
registerTableInZkClient();
logManager =
- LogManager.create(conf, zkClient, new FlussScheduler(1),
SystemClock.getInstance());
+ LogManager.create(
+ conf,
+ zkClient,
+ new FlussScheduler(1),
+ SystemClock.getInstance(),
+ TestingMetricGroups.TABLET_SERVER_METRICS);
logManager.startup();
}
@@ -194,7 +200,12 @@ final class LogManagerTest extends LogTestBase {
logManager = null;
LogManager newLogManager =
- LogManager.create(conf, zkClient, new FlussScheduler(1),
SystemClock.getInstance());
+ LogManager.create(
+ conf,
+ zkClient,
+ new FlussScheduler(1),
+ SystemClock.getInstance(),
+ TestingMetricGroups.TABLET_SERVER_METRICS);
newLogManager.startup();
logManager = newLogManager;
log1 = getOrCreateLog(tablePath1, null, tableBucket1);
@@ -234,7 +245,12 @@ final class LogManagerTest extends LogTestBase {
assertThat(new File(dataDir, CLEAN_SHUTDOWN_FILE).exists()).isTrue();
LogManager newLogManager =
- LogManager.create(conf, zkClient, new FlussScheduler(1),
SystemClock.getInstance());
+ LogManager.create(
+ conf,
+ zkClient,
+ new FlussScheduler(1),
+ SystemClock.getInstance(),
+ TestingMetricGroups.TABLET_SERVER_METRICS);
assertThat(new File(dataDir, CLEAN_SHUTDOWN_FILE).exists()).isTrue();
newLogManager.startup();
logManager = newLogManager;
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/log/LogTabletTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/log/LogTabletTest.java
index 468187ded..695dab198 100644
--- a/fluss-server/src/test/java/org/apache/fluss/server/log/LogTabletTest.java
+++ b/fluss-server/src/test/java/org/apache/fluss/server/log/LogTabletTest.java
@@ -28,6 +28,7 @@ import org.apache.fluss.record.LogRecordBatch;
import org.apache.fluss.record.LogRecordReadContext;
import org.apache.fluss.record.LogTestBase;
import org.apache.fluss.record.MemoryLogRecords;
+import org.apache.fluss.server.metrics.group.TestingMetricGroups;
import org.apache.fluss.types.RowType;
import org.apache.fluss.utils.CloseableIterator;
import org.apache.fluss.utils.clock.SystemClock;
@@ -89,6 +90,7 @@ final class LogTabletTest extends LogTestBase {
PhysicalTablePath.of(DATA1_TABLE_PATH),
logDir,
conf,
+ TestingMetricGroups.TABLET_SERVER_METRICS,
0,
scheduler,
LogFormat.ARROW,
@@ -491,6 +493,7 @@ final class LogTabletTest extends LogTestBase {
PhysicalTablePath.of(DATA1_TABLE_PATH),
logDir,
config,
+ TestingMetricGroups.TABLET_SERVER_METRICS,
0,
scheduler,
LogFormat.ARROW,
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/metrics/group/TestingMetricGroups.java
b/fluss-server/src/test/java/org/apache/fluss/server/metrics/group/TestingMetricGroups.java
index d3e0a5726..15d1316b5 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/metrics/group/TestingMetricGroups.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/metrics/group/TestingMetricGroups.java
@@ -17,7 +17,6 @@
package org.apache.fluss.server.metrics.group;
-import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.metrics.registry.NOPMetricRegistry;
@@ -30,13 +29,13 @@ public class TestingMetricGroups {
public static final CoordinatorMetricGroup COORDINATOR_METRICS =
new CoordinatorMetricGroup(NOPMetricRegistry.INSTANCE, "cluster1",
"host", "0");
- public static final PhysicalTableMetricGroup TABLE_METRICS =
- new PhysicalTableMetricGroup(
+ public static final TableMetricGroup TABLE_METRICS =
+ new TableMetricGroup(
NOPMetricRegistry.INSTANCE,
- PhysicalTablePath.of(TablePath.of("mydb", "mytable"),
null),
+ TablePath.of("mydb", "mytable"),
false,
TABLET_SERVER_METRICS);
public static final BucketMetricGroup BUCKET_METRICS =
- new BucketMetricGroup(NOPMetricRegistry.INSTANCE, 0,
TABLE_METRICS);
+ new BucketMetricGroup(NOPMetricRegistry.INSTANCE, null, 0,
TABLE_METRICS);
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
index 8f0b271bb..36862c4e5 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
@@ -178,10 +178,18 @@ public class ReplicaTestBase {
scheduler.startup();
manualClock = new ManualClock(System.currentTimeMillis());
- logManager = LogManager.create(conf, zkClient, scheduler, manualClock);
+ logManager =
+ LogManager.create(
+ conf,
+ zkClient,
+ scheduler,
+ manualClock,
+ TestingMetricGroups.TABLET_SERVER_METRICS);
logManager.startup();
- kvManager = KvManager.create(conf, zkClient, logManager);
+ kvManager =
+ KvManager.create(
+ conf, zkClient, logManager,
TestingMetricGroups.TABLET_SERVER_METRICS);
kvManager.startup();
serverMetadataCache =
@@ -436,8 +444,7 @@ public class ReplicaTestBase {
BucketMetricGroup metricGroup =
replicaManager
.getServerMetricGroup()
- .addPhysicalTableBucketMetricGroup(
- physicalTablePath, tableBucket.getBucket(),
isPkTable);
+ .addTableBucketMetricGroup(physicalTablePath,
tableBucket, isPkTable);
return new Replica(
physicalTablePath,
tableBucket,
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
index 777999f7e..a982ff058 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
@@ -323,7 +323,12 @@ public class ReplicaFetcherThreadTest {
scheduler.startup();
LogManager logManager =
- LogManager.create(conf, zkClient, scheduler,
SystemClock.getInstance());
+ LogManager.create(
+ conf,
+ zkClient,
+ scheduler,
+ SystemClock.getInstance(),
+ TestingMetricGroups.TABLET_SERVER_METRICS);
logManager.startup();
ReplicaManager replicaManager =
new TestingReplicaManager(
diff --git a/website/docs/maintenance/observability/monitor-metrics.md
b/website/docs/maintenance/observability/monitor-metrics.md
index 0aa996364..ac351d510 100644
--- a/website/docs/maintenance/observability/monitor-metrics.md
+++ b/website/docs/maintenance/observability/monitor-metrics.md
@@ -357,8 +357,23 @@ Some metrics might not be exposed when using other JVM
implementations (e.g. IBM
</thead>
<tbody>
<tr>
- <th rowspan="10"><strong>tabletserver</strong></th>
- <td style={{textAlign: 'center', verticalAlign: 'middle' }}
rowspan="10">-</td>
+ <th rowspan="25"><strong>tabletserver</strong></th>
+ <td style={{textAlign: 'center', verticalAlign: 'middle' }}
rowspan="25">-</td>
+ <td>messagesInPerSecond</td>
+ <td>The number of messages written per second to this server.</td>
+ <td>Meter</td>
+ </tr>
+ <tr>
+ <td>bytesInPerSecond</td>
+ <td>The number of bytes written per second to this server.</td>
+ <td>Meter</td>
+ </tr>
+ <tr>
+ <td>bytesOutPerSecond</td>
+ <td>The number of bytes read per second from this server.</td>
+ <td>Meter</td>
+ </tr>
+ <tr>
<td>replicationBytesInPerSecond</td>
<td>The bytes of data write into follower replica for data sync.</td>
<td>Meter</td>
@@ -408,6 +423,66 @@ Some metrics might not be exposed when using other JVM
implementations (e.g. IBM
<td>The delayed fetch log operation from client expire count per second
in this TabletServer.</td>
<td>Meter</td>
</tr>
+ <tr>
+ <td>underMinIsr</td>
+ <td>The count of buckets who is under min isr in this server.</td>
+ <td>Gauge</td>
+ </tr>
+ <tr>
+ <td>underReplicated</td>
+ <td>The count of buckets who is under replication factor in this
server.</td>
+ <td>Gauge</td>
+ </tr>
+ <tr>
+ <td>atMinIsr</td>
+ <td>The count of buckets who is at min isr in this server.</td>
+ <td>Gauge</td>
+ </tr>
+ <tr>
+ <td>isrExpandsPerSecond</td>
+ <td>The number of isr expands per second.</td>
+ <td>Meter</td>
+ </tr>
+ <tr>
+ <td>isrShrinksPerSecond</td>
+ <td>The number of isr shrinks per second.</td>
+ <td>Meter</td>
+ </tr>
+ <tr>
+ <td>failedIsrUpdatesPerSecond</td>
+ <td>The failed isr updates per second.</td>
+ <td>Meter</td>
+ </tr>
+ <tr>
+ <td>logFlushPerSecond</td>
+ <td>The log flush count per second.</td>
+ <td>Meter</td>
+ </tr>
+ <tr>
+ <td>logFlushLatencyMs</td>
+ <td>The log flush latency in ms.</td>
+ <td>Histogram</td>
+ </tr>
+ <tr>
+ <td>kvFlushPerSecond</td>
+ <td>The kv pre-write buffer flush to underlying RocksDB count per
second.</td>
+ <td>Meter</td>
+ </tr>
+ <tr>
+ <td>kvFlushLatencyMs</td>
+ <td>The kv pre-write buffer flush to underlying RocksDB latency in
ms.</td>
+ <td>Histogram</td>
+ </tr>
+ <tr>
+ <td>preWriteBufferTruncateAsDuplicatedPerSecond</td>
+ <td>The number of kv pre-write buffer truncate due to the batch
duplicated per second.</td>
+ <td>Meter</td>
+ </tr>
+ <tr>
+ <td>preWriteBufferTruncateAsErrorPerSecond</td>
+ <td>The number of kv pre-write buffer truncate due to the error happened
when writing cdc to log per second.</td>
+ <td>Meter</td>
+ </tr>
</tbody>
</table>
@@ -531,10 +606,10 @@ Some metrics might not be exposed when using other JVM
implementations (e.g. IBM
</thead>
<tbody>
<tr>
- <th rowspan="39"><strong>tabletserver</strong></th>
+ <th rowspan="30"><strong>tabletserver</strong></th>
<td rowspan="20">table</td>
<td>messagesInPerSecond</td>
- <td>The number of messages written per second to this table</td>
+ <td>The number of messages written per second to this table.</td>
<td>Meter</td>
</tr>
<tr>
@@ -631,45 +706,9 @@ Some metrics might not be exposed when using other JVM
implementations (e.g. IBM
<td>remoteLogDeleteErrorPerSecond</td>
<td>The number of failed delete remote log requests to delete remote log
after log ttl per second.</td>
<td>Meter</td>
- </tr>
- <tr>
- <td rowspan="7">table_bucket</td>
- <td>inSyncReplicasCount</td>
- <td>The inSync replicas count of this table bucket.</td>
- <td>Gauge</td>
- </tr>
- <tr>
- <td>underMinIsr</td>
- <td>If this bucket is under min isr, this value is 1, otherwise 0.</td>
- <td>Gauge</td>
- </tr>
- <tr>
- <td>underReplicated</td>
- <td>If this bucket is under replication factor, this value is 1,
otherwise 0.</td>
- <td>Gauge</td>
- </tr>
- <tr>
- <td>atMinIsr</td>
- <td>If this bucket is at min isr, this value is 1, otherwise 0.</td>
- <td>Gauge</td>
- </tr>
- <tr>
- <td>isrExpandsPerSecond</td>
- <td>The number of isr expands per second.</td>
- <td>Meter</td>
- </tr>
- <tr>
- <td>isrShrinksPerSecond</td>
- <td>The number of isr shrinks per second.</td>
- <td>Meter</td>
</tr>
<tr>
- <td>failedIsrUpdatesPerSecond</td>
- <td>The failed isr updates per second.</td>
- <td>Meter</td>
- </tr>
- <tr>
- <td rowspan="5">table_bucket_log</td>
+ <td rowspan="3">table_bucket_log</td>
<td>numSegments</td>
<td>The number of segments in local storage for this table bucket.</td>
<td>Gauge</td>
@@ -683,16 +722,6 @@ Some metrics might not be exposed when using other JVM
implementations (e.g. IBM
<td>size</td>
<td>The total log sizes in local storage for this table bucket.</td>
<td>Gauge</td>
- </tr>
- <tr>
- <td>flushPerSecond</td>
- <td>The log flush count per second.</td>
- <td>Meter</td>
- </tr>
- <tr>
- <td>flushLatencyMs</td>
- <td>The log flush latency in ms.</td>
- <td>Histogram</td>
</tr>
<tr>
<td rowspan="3">table_bucket_remoteLog</td>
@@ -710,27 +739,6 @@ Some metrics might not be exposed when using other JVM
implementations (e.g. IBM
<td>The number of bytes written per second to this table.</td>
<td>Gauge</td>
</tr>
- <tr>
- <td rowspan="4">table_bucket_kv</td>
- <td>preWriteBufferFlushPerSecond</td>
- <td>The kv pre-write buffer flush count per second.</td>
- <td>Meter</td>
- </tr>
- <tr>
- <td>preWriteBufferFlushLatencyMs</td>
- <td>The kv pre-write buffer latency in ms.</td>
- <td>Histogram</td>
- </tr>
- <tr>
- <td>preWriteBufferTruncateAsDuplicatedPerSecond</td>
- <td>The number of kv pre-write buffer truncate due to the batch
duplicated per second.</td>
- <td>Meter</td>
- </tr>
- <tr>
- <td>preWriteBufferTruncateAsErrorPerSecond</td>
- <td>The number of kv pre-write buffer truncate due to the error happened
when writing cdc to log per second.</td>
- <td>Meter</td>
- </tr>
<tr>
<td rowspan="1">table_bucket_kv_snapshot</td>
<td>latestSnapshotSize</td>