This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 03313a9b0 [metric] Change some metrics from bucket-level to 
table-level and server-level (#1531)
03313a9b0 is described below

commit 03313a9b02dca57c87c406f0ecf396b08fa8726a
Author: xiaozhou <[email protected]>
AuthorDate: Fri Sep 12 14:07:44 2025 +0800

    [metric] Change some metrics from bucket-level to table-level and 
server-level (#1531)
---
 .../java/org/apache/fluss/metrics/MetricNames.java |  30 ++--
 .../java/org/apache/fluss/server/kv/KvManager.java |  17 ++-
 .../java/org/apache/fluss/server/kv/KvTablet.java  |  30 +---
 .../fluss/server/kv/prewrite/KvPreWriteBuffer.java |  17 ++-
 .../java/org/apache/fluss/server/log/LocalLog.java |  17 +--
 .../org/apache/fluss/server/log/LogManager.java    |  17 ++-
 .../org/apache/fluss/server/log/LogTablet.java     |   9 +-
 .../fluss/server/log/remote/LogTieringTask.java    |   8 +-
 .../server/metrics/group/BucketMetricGroup.java    |  22 ++-
 ...TableMetricGroup.java => TableMetricGroup.java} | 106 +++++++-------
 .../metrics/group/TabletServerMetricGroup.java     | 153 ++++++++++++++++++---
 .../org/apache/fluss/server/replica/Replica.java   |  35 ++---
 .../fluss/server/replica/ReplicaManager.java       |  50 ++++---
 .../apache/fluss/server/tablet/TabletServer.java   |   5 +-
 .../org/apache/fluss/server/kv/KvManagerTest.java  |  16 ++-
 .../org/apache/fluss/server/kv/KvTabletTest.java   |   3 +
 .../server/kv/prewrite/KvPreWriteBufferTest.java   |  13 +-
 .../org/apache/fluss/server/log/LocalLogTest.java  |   2 +
 .../apache/fluss/server/log/LogManagerTest.java    |  22 ++-
 .../org/apache/fluss/server/log/LogTabletTest.java |   3 +
 .../server/metrics/group/TestingMetricGroups.java  |   9 +-
 .../fluss/server/replica/ReplicaTestBase.java      |  15 +-
 .../replica/fetcher/ReplicaFetcherThreadTest.java  |   7 +-
 .../maintenance/observability/monitor-metrics.md   | 152 ++++++++++----------
 24 files changed, 474 insertions(+), 284 deletions(-)

diff --git 
a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java 
b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
index a51820e54..7bd21b552 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
@@ -94,13 +94,8 @@ public class MetricNames {
     public static final String FAILED_PREFIX_LOOKUP_REQUESTS_RATE =
             "failedPrefixLookupRequestsPerSecond";
 
-    // 
--------------------------------------------------------------------------------------------
-    // metrics for table bucket
-    // 
--------------------------------------------------------------------------------------------
-
     // for replica
     public static final String UNDER_REPLICATED = "underReplicated";
-    public static final String IN_SYNC_REPLICAS = "inSyncReplicasCount";
     public static final String UNDER_MIN_ISR = "underMinIsr";
     public static final String AT_MIN_ISR = "atMinIsr";
     public static final String ISR_EXPANDS_RATE = "isrExpandsPerSecond";
@@ -108,21 +103,28 @@ public class MetricNames {
     public static final String FAILED_ISR_UPDATES_RATE = 
"failedIsrUpdatesPerSecond";
 
     // for log tablet
-    public static final String LOG_NUM_SEGMENTS = "numSegments";
-    public static final String LOG_END_OFFSET = "endOffset";
-    public static final String LOG_SIZE = "size";
-    public static final String LOG_FLUSH_RATE = "flushPerSecond";
-    public static final String LOG_FLUSH_LATENCY_MS = "flushLatencyMs";
+    public static final String LOG_FLUSH_RATE = "logFlushPerSecond";
+    public static final String LOG_FLUSH_LATENCY_MS = "logFlushLatencyMs";
 
     // for kv tablet
-    public static final String KV_LATEST_SNAPSHOT_SIZE = "latestSnapshotSize";
+    public static final String KV_FLUSH_RATE = "kvFlushPerSecond";
+    public static final String KV_FLUSH_LATENCY_MS = "kvFlushLatencyMs";
     public static final String KV_PRE_WRITE_BUFFER_TRUNCATE_AS_DUPLICATED_RATE 
=
             "preWriteBufferTruncateAsDuplicatedPerSecond";
     public static final String KV_PRE_WRITE_BUFFER_TRUNCATE_AS_ERROR_RATE =
             "preWriteBufferTruncateAsErrorPerSecond";
-    public static final String KV_PRE_WRITE_BUFFER_FLUSH_RATE = 
"preWriteBufferFlushPerSecond";
-    public static final String KV_PRE_WRITE_BUFFER_FLUSH_LATENCY_MS =
-            "preWriteBufferFlushLatencyMs";
+
+    // 
--------------------------------------------------------------------------------------------
+    // metrics for table bucket
+    // 
--------------------------------------------------------------------------------------------
+
+    // for log tablet
+    public static final String LOG_NUM_SEGMENTS = "numSegments";
+    public static final String LOG_END_OFFSET = "endOffset";
+    public static final String LOG_SIZE = "size";
+
+    // for kv tablet
+    public static final String KV_LATEST_SNAPSHOT_SIZE = "latestSnapshotSize";
 
     // 
--------------------------------------------------------------------------------------------
     // metrics for rpc client
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java
index 22290e983..2d2764499 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvManager.java
@@ -36,6 +36,7 @@ import org.apache.fluss.server.TabletManagerBase;
 import org.apache.fluss.server.kv.rowmerger.RowMerger;
 import org.apache.fluss.server.log.LogManager;
 import org.apache.fluss.server.log.LogTablet;
+import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
 import org.apache.fluss.server.zk.ZooKeeperClient;
 import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator;
 import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator;
@@ -69,6 +70,8 @@ public final class KvManager extends TabletManagerBase {
     private static final Logger LOG = LoggerFactory.getLogger(KvManager.class);
     private final LogManager logManager;
 
+    private final TabletServerMetricGroup serverMetricGroup;
+
     private final ZooKeeperClient zkClient;
 
     private final Map<TableBucket, KvTablet> currentKvs = 
MapUtils.newConcurrentHashMap();
@@ -91,7 +94,8 @@ public final class KvManager extends TabletManagerBase {
             Configuration conf,
             ZooKeeperClient zkClient,
             int recoveryThreadsPerDataDir,
-            LogManager logManager)
+            LogManager logManager,
+            TabletServerMetricGroup tabletServerMetricGroup)
             throws IOException {
         super(TabletType.KV, dataDir, conf, recoveryThreadsPerDataDir);
         this.logManager = logManager;
@@ -100,10 +104,14 @@ public final class KvManager extends TabletManagerBase {
         this.zkClient = zkClient;
         this.remoteKvDir = FlussPaths.remoteKvDir(conf);
         this.remoteFileSystem = remoteKvDir.getFileSystem();
+        this.serverMetricGroup = tabletServerMetricGroup;
     }
 
     public static KvManager create(
-            Configuration conf, ZooKeeperClient zkClient, LogManager 
logManager)
+            Configuration conf,
+            ZooKeeperClient zkClient,
+            LogManager logManager,
+            TabletServerMetricGroup tabletServerMetricGroup)
             throws IOException {
         String dataDirString = conf.getString(ConfigOptions.DATA_DIR);
         File dataDir = new File(dataDirString).getAbsoluteFile();
@@ -112,7 +120,8 @@ public final class KvManager extends TabletManagerBase {
                 conf,
                 zkClient,
                 conf.getInt(ConfigOptions.NETTY_SERVER_NUM_WORKER_THREADS),
-                logManager);
+                logManager,
+                tabletServerMetricGroup);
     }
 
     public void startup() {
@@ -171,6 +180,7 @@ public final class KvManager extends TabletManagerBase {
                                     logTablet,
                                     tabletDir,
                                     conf,
+                                    serverMetricGroup,
                                     arrowBufferAllocator,
                                     memorySegmentPool,
                                     kvFormat,
@@ -279,6 +289,7 @@ public final class KvManager extends TabletManagerBase {
                         logTablet,
                         tabletDir,
                         conf,
+                        serverMetricGroup,
                         arrowBufferAllocator,
                         memorySegmentPool,
                         tableInfo.getTableConfig().getKvFormat(),
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
index 4c3fde697..0437053af 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/kv/KvTablet.java
@@ -29,9 +29,6 @@ import org.apache.fluss.metadata.PhysicalTablePath;
 import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TablePath;
-import org.apache.fluss.metrics.MeterView;
-import org.apache.fluss.metrics.MetricNames;
-import org.apache.fluss.metrics.groups.MetricGroup;
 import org.apache.fluss.record.ChangeType;
 import org.apache.fluss.record.KvRecord;
 import org.apache.fluss.record.KvRecordBatch;
@@ -55,7 +52,7 @@ import org.apache.fluss.server.kv.wal.IndexWalBuilder;
 import org.apache.fluss.server.kv.wal.WalBuilder;
 import org.apache.fluss.server.log.LogAppendInfo;
 import org.apache.fluss.server.log.LogTablet;
-import org.apache.fluss.server.metrics.group.BucketMetricGroup;
+import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
 import org.apache.fluss.server.utils.FatalErrorHandler;
 import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator;
 import org.apache.fluss.types.DataType;
@@ -124,6 +121,7 @@ public final class KvTablet {
             TableBucket tableBucket,
             LogTablet logTablet,
             File kvTabletDir,
+            TabletServerMetricGroup serverMetricGroup,
             RocksDBKv rocksDBKv,
             long writeBatchSize,
             LogFormat logFormat,
@@ -139,7 +137,7 @@ public final class KvTablet {
         this.kvTabletDir = kvTabletDir;
         this.rocksDBKv = rocksDBKv;
         this.writeBatchSize = writeBatchSize;
-        this.kvPreWriteBuffer = new KvPreWriteBuffer(createKvBatchWriter());
+        this.kvPreWriteBuffer = new KvPreWriteBuffer(createKvBatchWriter(), 
serverMetricGroup);
         this.logFormat = logFormat;
         this.arrowWriterProvider = new ArrowWriterPool(arrowBufferAllocator);
         this.memorySegmentPool = memorySegmentPool;
@@ -153,6 +151,7 @@ public final class KvTablet {
             LogTablet logTablet,
             File kvTabletDir,
             Configuration serverConf,
+            TabletServerMetricGroup serverMetricGroup,
             BufferAllocator arrowBufferAllocator,
             MemorySegmentPool memorySegmentPool,
             KvFormat kvFormat,
@@ -168,6 +167,7 @@ public final class KvTablet {
                 logTablet,
                 kvTabletDir,
                 serverConf,
+                serverMetricGroup,
                 arrowBufferAllocator,
                 memorySegmentPool,
                 kvFormat,
@@ -182,6 +182,7 @@ public final class KvTablet {
             LogTablet logTablet,
             File kvTabletDir,
             Configuration serverConf,
+            TabletServerMetricGroup serverMetricGroup,
             BufferAllocator arrowBufferAllocator,
             MemorySegmentPool memorySegmentPool,
             KvFormat kvFormat,
@@ -195,6 +196,7 @@ public final class KvTablet {
                 tableBucket,
                 logTablet,
                 kvTabletDir,
+                serverMetricGroup,
                 kv,
                 serverConf.get(ConfigOptions.KV_WRITE_BATCH_SIZE).getBytes(),
                 logTablet.getLogFormat(),
@@ -243,24 +245,6 @@ public final class KvTablet {
         return flushedLogOffset;
     }
 
-    public void registerMetrics(BucketMetricGroup bucketMetricGroup) {
-        MetricGroup metricGroup = bucketMetricGroup.addGroup("kv");
-
-        // about pre-write buffer.
-        metricGroup.meter(
-                MetricNames.KV_PRE_WRITE_BUFFER_FLUSH_RATE,
-                new MeterView(kvPreWriteBuffer.getFlushCount()));
-        metricGroup.histogram(
-                MetricNames.KV_PRE_WRITE_BUFFER_FLUSH_LATENCY_MS,
-                kvPreWriteBuffer.getFlushLatencyHistogram());
-        metricGroup.meter(
-                MetricNames.KV_PRE_WRITE_BUFFER_TRUNCATE_AS_DUPLICATED_RATE,
-                new 
MeterView(kvPreWriteBuffer.getTruncateAsDuplicatedCount()));
-        metricGroup.meter(
-                MetricNames.KV_PRE_WRITE_BUFFER_TRUNCATE_AS_ERROR_RATE,
-                new MeterView(kvPreWriteBuffer.getTruncateAsErrorCount()));
-    }
-
     /**
      * Put the KvRecordBatch into the kv storage, and return the appended wal 
log info.
      *
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBuffer.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBuffer.java
index 7d53b2fe7..255d7a29e 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBuffer.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBuffer.java
@@ -20,10 +20,9 @@ package org.apache.fluss.server.kv.prewrite;
 import org.apache.fluss.annotation.VisibleForTesting;
 import org.apache.fluss.memory.MemorySegment;
 import org.apache.fluss.metrics.Counter;
-import org.apache.fluss.metrics.DescriptiveStatisticsHistogram;
 import org.apache.fluss.metrics.Histogram;
-import org.apache.fluss.metrics.SimpleCounter;
 import org.apache.fluss.server.kv.KvBatchWriter;
+import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
 import org.apache.fluss.utils.MurmurHashUtils;
 
 import javax.annotation.Nullable;
@@ -61,7 +60,7 @@ import static 
org.apache.fluss.utils.UnsafeUtils.BYTE_ARRAY_BASE_OFFSET;
  *   <li>Buffer all the key-value pairs that are waiting for the corresponding 
WAL to be persisted.
  *       And flush these key-value pairs whose WAL has been persisted to 
underlying kv storage.
  *   <li>A temporary in-memory key-value buffer for put/get a key. Since Fluss 
will lookup the
- *       previous written data to generate CDC as WAL, it need a buffer to 
buffer the data been
+ *       previous written data to generate CDC as WAL, it needs a buffer to 
buffer the data been
  *       written before but is still waiting for the WAL to be persisted 
before flush to underlying
  *       kv storage.
  * </ol>
@@ -104,14 +103,14 @@ public class KvPreWriteBuffer implements AutoCloseable {
     // the max LSN in the buffer
     private long maxLogSequenceNumber = -1;
 
-    public KvPreWriteBuffer(KvBatchWriter kvBatchWriter) {
+    public KvPreWriteBuffer(
+            KvBatchWriter kvBatchWriter, TabletServerMetricGroup 
serverMetricGroup) {
         this.kvBatchWriter = kvBatchWriter;
 
-        flushCount = new SimpleCounter();
-        // consider won't flush frequently, we set a small window size
-        flushLatencyHistogram = new DescriptiveStatisticsHistogram(5);
-        truncateAsDuplicatedCount = new SimpleCounter();
-        truncateAsErrorCount = new SimpleCounter();
+        flushCount = serverMetricGroup.kvFlushCount();
+        flushLatencyHistogram = serverMetricGroup.kvFlushLatencyHistogram();
+        truncateAsDuplicatedCount = 
serverMetricGroup.kvTruncateAsDuplicatedCount();
+        truncateAsErrorCount = serverMetricGroup.kvTruncateAsErrorCount();
     }
 
     /**
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/LocalLog.java 
b/fluss-server/src/main/java/org/apache/fluss/server/log/LocalLog.java
index 8266c4ed5..061c83ebb 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/log/LocalLog.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LocalLog.java
@@ -25,11 +25,10 @@ import org.apache.fluss.exception.LogStorageException;
 import org.apache.fluss.metadata.LogFormat;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metrics.Counter;
-import org.apache.fluss.metrics.DescriptiveStatisticsHistogram;
 import org.apache.fluss.metrics.Histogram;
-import org.apache.fluss.metrics.SimpleCounter;
 import org.apache.fluss.record.FileLogProjection;
 import org.apache.fluss.record.MemoryLogRecords;
+import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
 import org.apache.fluss.utils.FileUtils;
 import org.apache.fluss.utils.FlussPaths;
 
@@ -90,6 +89,7 @@ public final class LocalLog {
     public LocalLog(
             File logTabletDir,
             Configuration config,
+            TabletServerMetricGroup serverMetricGroup,
             LogSegments segments,
             long recoveryPoint,
             LogOffsetMetadata nextOffsetMetadata,
@@ -105,9 +105,8 @@ public final class LocalLog {
         this.logFormat = logFormat;
 
         lastFlushedTime = new AtomicLong(System.currentTimeMillis());
-        flushCount = new SimpleCounter();
-        // consider won't flush frequently, we set a small window size
-        flushLatencyHistogram = new DescriptiveStatisticsHistogram(5);
+        flushCount = serverMetricGroup.logFlushCount();
+        flushLatencyHistogram = serverMetricGroup.logFlushLatencyHistogram();
         localLogStartOffset = segments.isEmpty() ? 0L : 
segments.firstSegmentBaseOffset().get();
         localMaxTimestamp =
                 segments.isEmpty() ? 0L : 
segments.lastSegment().get().maxTimestampSoFar();
@@ -125,14 +124,6 @@ public final class LocalLog {
         return recoveryPoint;
     }
 
-    Histogram getFlushLatencyHistogram() {
-        return flushLatencyHistogram;
-    }
-
-    Counter getFlushCount() {
-        return flushCount;
-    }
-
     /** The offset metadata of the next message that will be appended to the 
log. */
     @VisibleForTesting
     LogOffsetMetadata getLocalLogEndOffsetMetadata() {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java 
b/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java
index f881f388e..644eedfb4 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogManager.java
@@ -29,6 +29,7 @@ import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.server.TabletManagerBase;
 import org.apache.fluss.server.log.checkpoint.OffsetCheckpointFile;
+import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
 import org.apache.fluss.server.zk.ZooKeeperClient;
 import org.apache.fluss.utils.FileUtils;
 import org.apache.fluss.utils.FlussPaths;
@@ -87,6 +88,7 @@ public final class LogManager extends TabletManagerBase {
     private final ZooKeeperClient zkClient;
     private final Scheduler scheduler;
     private final Clock clock;
+    private final TabletServerMetricGroup serverMetricGroup;
     private final ReentrantLock logCreationOrDeletionLock = new 
ReentrantLock();
 
     private final Map<TableBucket, LogTablet> currentLogs = 
MapUtils.newConcurrentHashMap();
@@ -100,19 +102,25 @@ public final class LogManager extends TabletManagerBase {
             ZooKeeperClient zkClient,
             int recoveryThreadsPerDataDir,
             Scheduler scheduler,
-            Clock clock)
+            Clock clock,
+            TabletServerMetricGroup serverMetricGroup)
             throws Exception {
         super(TabletType.LOG, dataDir, conf, recoveryThreadsPerDataDir);
         this.zkClient = zkClient;
         this.scheduler = scheduler;
         this.clock = clock;
+        this.serverMetricGroup = serverMetricGroup;
         createAndValidateDataDir(dataDir);
 
         initializeCheckpointMaps();
     }
 
     public static LogManager create(
-            Configuration conf, ZooKeeperClient zkClient, Scheduler scheduler, 
Clock clock)
+            Configuration conf,
+            ZooKeeperClient zkClient,
+            Scheduler scheduler,
+            Clock clock,
+            TabletServerMetricGroup serverMetricGroup)
             throws Exception {
         String dataDirString = conf.getString(ConfigOptions.DATA_DIR);
         File dataDir = new File(dataDirString).getAbsoluteFile();
@@ -122,7 +130,8 @@ public final class LogManager extends TabletManagerBase {
                 zkClient,
                 conf.getInt(ConfigOptions.NETTY_SERVER_NUM_WORKER_THREADS),
                 scheduler,
-                clock);
+                clock,
+                serverMetricGroup);
     }
 
     public void startup() {
@@ -246,6 +255,7 @@ public final class LogManager extends TabletManagerBase {
                                     tablePath,
                                     tabletDir,
                                     conf,
+                                    serverMetricGroup,
                                     0L,
                                     scheduler,
                                     logFormat,
@@ -348,6 +358,7 @@ public final class LogManager extends TabletManagerBase {
                         physicalTablePath,
                         tabletDir,
                         conf,
+                        serverMetricGroup,
                         logRecoveryPoint,
                         scheduler,
                         tableInfo.getTableConfig().getLogFormat(),
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java 
b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
index 689fc122b..8eb19362f 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
@@ -30,7 +30,6 @@ import org.apache.fluss.metadata.LogFormat;
 import org.apache.fluss.metadata.PhysicalTablePath;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TablePath;
-import org.apache.fluss.metrics.MeterView;
 import org.apache.fluss.metrics.MetricNames;
 import org.apache.fluss.metrics.groups.MetricGroup;
 import org.apache.fluss.record.DefaultLogRecordBatch;
@@ -41,6 +40,7 @@ import org.apache.fluss.record.LogRecords;
 import org.apache.fluss.record.MemoryLogRecords;
 import org.apache.fluss.server.log.LocalLog.SegmentDeletionReason;
 import org.apache.fluss.server.metrics.group.BucketMetricGroup;
+import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
 import org.apache.fluss.utils.FlussPaths;
 import org.apache.fluss.utils.clock.Clock;
 import org.apache.fluss.utils.concurrent.Scheduler;
@@ -277,6 +277,7 @@ public final class LogTablet {
             PhysicalTablePath tablePath,
             File tabletDir,
             Configuration conf,
+            TabletServerMetricGroup serverMetricGroup,
             long recoveryPoint,
             Scheduler scheduler,
             LogFormat logFormat,
@@ -313,6 +314,7 @@ public final class LogTablet {
                 new LocalLog(
                         tabletDir,
                         conf,
+                        serverMetricGroup,
                         segments,
                         recoveryPoint,
                         offsets.getNextOffsetMetadata(),
@@ -338,11 +340,6 @@ public final class LogTablet {
                 MetricNames.LOG_NUM_SEGMENTS, () -> 
localLog.getSegments().numberOfSegments());
         metricGroup.gauge(MetricNames.LOG_END_OFFSET, 
localLog::getLocalLogEndOffset);
         metricGroup.gauge(MetricNames.LOG_SIZE, () -> 
localLog.getSegments().sizeInBytes());
-
-        // about flush
-        metricGroup.meter(MetricNames.LOG_FLUSH_RATE, new 
MeterView(localLog.getFlushCount()));
-        metricGroup.histogram(
-                MetricNames.LOG_FLUSH_LATENCY_MS, 
localLog.getFlushLatencyHistogram());
     }
 
     public void updateLeaderEndOffsetSnapshot() {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java
index 4eace9cde..0e3666509 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java
@@ -28,7 +28,7 @@ import 
org.apache.fluss.rpc.messages.CommitRemoteLogManifestRequest;
 import org.apache.fluss.server.entity.CommitRemoteLogManifestData;
 import org.apache.fluss.server.log.LogSegment;
 import org.apache.fluss.server.log.LogTablet;
-import org.apache.fluss.server.metrics.group.PhysicalTableMetricGroup;
+import org.apache.fluss.server.metrics.group.TableMetricGroup;
 import org.apache.fluss.server.replica.Replica;
 import org.apache.fluss.utils.clock.Clock;
 
@@ -121,7 +121,7 @@ public class LogTieringTask implements Runnable {
 
         try {
             LogTablet logTablet = replica.getLogTablet();
-            PhysicalTableMetricGroup metricGroup = replica.tableMetrics();
+            TableMetricGroup metricGroup = replica.tableMetrics();
             maybeUpdateCopiedOffset(logTablet);
 
             // Get these candidate log segments to copy and these expired 
remote log segments to
@@ -234,7 +234,7 @@ public class LogTieringTask implements Runnable {
             LogTablet log,
             List<EnrichedLogSegment> segments,
             List<RemoteLogSegment> copiedSegments,
-            PhysicalTableMetricGroup metricGroup)
+            TableMetricGroup metricGroup)
             throws Exception {
         long endOffset = -1;
         for (EnrichedLogSegment enrichedSegment : segments) {
@@ -449,7 +449,7 @@ public class LogTieringTask implements Runnable {
 
     /** Delete the remote log segment files. */
     private void deleteRemoteLogSegmentFiles(
-            List<RemoteLogSegment> remoteLogSegmentList, 
PhysicalTableMetricGroup metricGroup) {
+            List<RemoteLogSegment> remoteLogSegmentList, TableMetricGroup 
metricGroup) {
         for (RemoteLogSegment remoteLogSegment : remoteLogSegmentList) {
             try {
                 remoteLogStorage.deleteLogSegmentFiles(remoteLogSegment);
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/BucketMetricGroup.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/BucketMetricGroup.java
index 9770a603d..fc8e281a5 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/BucketMetricGroup.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/BucketMetricGroup.java
@@ -21,22 +21,36 @@ import org.apache.fluss.metrics.CharacterFilter;
 import org.apache.fluss.metrics.groups.AbstractMetricGroup;
 import org.apache.fluss.metrics.registry.MetricRegistry;
 
+import javax.annotation.Nullable;
+
 import java.util.Map;
 
 import static org.apache.fluss.metrics.utils.MetricGroupUtils.makeScope;
 
 /** Metrics for the table buckets with table as parent group. */
 public class BucketMetricGroup extends AbstractMetricGroup {
-
+    // will be null if the bucket doesn't belong to a partition
+    private final @Nullable String partitionName;
     private final int bucket;
 
-    public BucketMetricGroup(MetricRegistry registry, int bucket, 
PhysicalTableMetricGroup parent) {
+    public BucketMetricGroup(
+            MetricRegistry registry,
+            @Nullable String partitionName,
+            int bucket,
+            TableMetricGroup parent) {
         super(registry, makeScope(parent, String.valueOf(bucket)), parent);
+        this.partitionName = partitionName;
         this.bucket = bucket;
     }
 
     @Override
     protected void putVariables(Map<String, String> variables) {
+        if (partitionName != null) {
+            variables.put("partition", partitionName);
+        } else {
+            // value of empty string indicates non-partitioned tables
+            variables.put("partition", "");
+        }
         variables.put("bucket", String.valueOf(bucket));
     }
 
@@ -45,7 +59,7 @@ public class BucketMetricGroup extends AbstractMetricGroup {
         return "bucket";
     }
 
-    public PhysicalTableMetricGroup getPhysicalTableMetricGroup() {
-        return (PhysicalTableMetricGroup) parent;
+    public TableMetricGroup getTableMetricGroup() {
+        return (TableMetricGroup) parent;
     }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/PhysicalTableMetricGroup.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java
similarity index 81%
rename from 
fluss-server/src/main/java/org/apache/fluss/server/metrics/group/PhysicalTableMetricGroup.java
rename to 
fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java
index 9a5370fac..7620bcbfd 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/PhysicalTableMetricGroup.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TableMetricGroup.java
@@ -17,7 +17,8 @@
 
 package org.apache.fluss.server.metrics.group;
 
-import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.metrics.CharacterFilter;
 import org.apache.fluss.metrics.Counter;
 import org.apache.fluss.metrics.MeterView;
@@ -35,34 +36,35 @@ import java.util.Map;
 import static org.apache.fluss.metrics.utils.MetricGroupUtils.makeScope;
 
 /**
- * Metrics for the physical tables(tables or partitions) in server with {@link
- * TabletServerMetricGroup} as parent group.
+ * Metrics for the tables(tables or partitions) in server with {@link 
TabletServerMetricGroup} as
+ * parent group.
  */
-public class PhysicalTableMetricGroup extends AbstractMetricGroup {
+public class TableMetricGroup extends AbstractMetricGroup {
 
-    private final Map<Integer, BucketMetricGroup> buckets = new HashMap<>();
+    private final Map<TableBucket, BucketMetricGroup> buckets = new 
HashMap<>();
 
-    private final PhysicalTablePath physicalTablePath;
+    private final TablePath tablePath;
 
-    // ---- metrics for log, when the table is for kv, it's for cdc log
+    // server-level metrics
+    private final TabletServerMetricGroup serverMetrics;
+
+    // table-level metrics for log, when the table is for kv, it's for cdc log
     private final LogMetricGroup logMetrics;
 
-    // ---- metrics for kv, will be null if the table isn't a kv table ----
+    // table-level  metrics for kv, will be null if the table isn't a kv table
     private final @Nullable KvMetricGroup kvMetrics;
 
-    public PhysicalTableMetricGroup(
+    public TableMetricGroup(
             MetricRegistry registry,
-            PhysicalTablePath physicalTablePath,
+            TablePath tablePath,
             boolean isKvTable,
             TabletServerMetricGroup serverMetricGroup) {
         super(
                 registry,
-                makeScope(
-                        serverMetricGroup,
-                        physicalTablePath.getDatabaseName(),
-                        physicalTablePath.getTableName()),
+                makeScope(serverMetricGroup, tablePath.getDatabaseName(), 
tablePath.getTableName()),
                 serverMetricGroup);
-        this.physicalTablePath = physicalTablePath;
+        this.serverMetrics = serverMetricGroup;
+        this.tablePath = tablePath;
 
         // if is kv table, create kv metrics
         if (isKvTable) {
@@ -77,15 +79,8 @@ public class PhysicalTableMetricGroup extends 
AbstractMetricGroup {
 
     @Override
     protected void putVariables(Map<String, String> variables) {
-        variables.put("database", physicalTablePath.getDatabaseName());
-        variables.put("table", physicalTablePath.getTableName());
-
-        if (physicalTablePath.getPartitionName() != null) {
-            variables.put("partition", physicalTablePath.getPartitionName());
-        } else {
-            // value of empty string indicates non-partitioned tables
-            variables.put("partition", "");
-        }
+        variables.put("database", tablePath.getDatabaseName());
+        variables.put("table", tablePath.getTableName());
     }
 
     @Override
@@ -94,16 +89,19 @@ public class PhysicalTableMetricGroup extends 
AbstractMetricGroup {
         return "table";
     }
 
-    public Counter logMessageIn() {
-        return logMetrics.messagesIn;
+    public void incLogMessageIn(long n) {
+        logMetrics.messagesIn.inc(n);
+        serverMetrics.messageIn().inc(n);
     }
 
-    public Counter logBytesIn() {
-        return logMetrics.bytesIn;
+    public void incLogBytesIn(long n) {
+        logMetrics.bytesIn.inc(n);
+        serverMetrics.bytesIn().inc(n);
     }
 
-    public Counter logBytesOut() {
-        return logMetrics.bytesOut;
+    public void incLogBytesOut(long n) {
+        logMetrics.bytesOut.inc(n);
+        serverMetrics.bytesOut().inc(n);
     }
 
     public Counter totalFetchLogRequests() {
@@ -142,19 +140,21 @@ public class PhysicalTableMetricGroup extends 
AbstractMetricGroup {
         return logMetrics.remoteLogDeleteErrors;
     }
 
-    public Counter kvMessageIn() {
+    public void incKvMessageIn(long n) {
         if (kvMetrics == null) {
-            return NoOpCounter.INSTANCE;
+            NoOpCounter.INSTANCE.inc(n);
         } else {
-            return kvMetrics.messagesIn;
+            kvMetrics.messagesIn.inc(n);
+            serverMetrics.messageIn().inc(n);
         }
     }
 
-    public Counter kvBytesIn() {
+    public void incKvBytesIn(long n) {
         if (kvMetrics == null) {
-            return NoOpCounter.INSTANCE;
+            NoOpCounter.INSTANCE.inc(n);
         } else {
-            return kvMetrics.bytesIn;
+            kvMetrics.bytesIn.inc(n);
+            serverMetrics.bytesIn().inc(n);
         }
     }
 
@@ -225,13 +225,17 @@ public class PhysicalTableMetricGroup extends 
AbstractMetricGroup {
     // ------------------------------------------------------------------------
     //  bucket groups
     // ------------------------------------------------------------------------
-    public BucketMetricGroup addBucketMetricGroup(int bucketId) {
+    public BucketMetricGroup addBucketMetricGroup(
+            @Nullable String partitionName, TableBucket tableBucket) {
         return buckets.computeIfAbsent(
-                bucketId, (bucket) -> new BucketMetricGroup(registry, 
bucketId, this));
+                tableBucket,
+                (bucket) ->
+                        new BucketMetricGroup(
+                                registry, partitionName, 
tableBucket.getBucket(), this));
     }
 
-    public void removeBucketMetricGroup(int bucketId) {
-        BucketMetricGroup metricGroup = buckets.remove(bucketId);
+    public void removeBucketMetricGroup(TableBucket tableBucket) {
+        BucketMetricGroup metricGroup = buckets.remove(tableBucket);
         metricGroup.close();
     }
 
@@ -239,6 +243,10 @@ public class PhysicalTableMetricGroup extends 
AbstractMetricGroup {
         return buckets.size();
     }
 
+    public TabletServerMetricGroup getServerMetricGroup() {
+        return (TabletServerMetricGroup) parent;
+    }
+
     /** Metric group for specific kind of tablet of a table. */
     private static class TabletMetricGroup extends AbstractMetricGroup {
         private final TabletType tabletType;
@@ -248,12 +256,11 @@ public class PhysicalTableMetricGroup extends 
AbstractMetricGroup {
         protected final Counter bytesIn;
         protected final Counter bytesOut;
 
-        private TabletMetricGroup(
-                PhysicalTableMetricGroup physicalTableMetricGroup, TabletType 
tabletType) {
+        private TabletMetricGroup(TableMetricGroup tableMetricGroup, 
TabletType tabletType) {
             super(
-                    physicalTableMetricGroup.registry,
-                    makeScope(physicalTableMetricGroup, tabletType.name),
-                    physicalTableMetricGroup);
+                    tableMetricGroup.registry,
+                    makeScope(tableMetricGroup, tabletType.name),
+                    tableMetricGroup);
             this.tabletType = tabletType;
 
             messagesIn = new ThreadSafeSimpleCounter();
@@ -293,9 +300,8 @@ public class PhysicalTableMetricGroup extends 
AbstractMetricGroup {
         private final Counter remoteLogDeleteRequests;
         private final Counter remoteLogDeleteErrors;
 
-        private LogMetricGroup(
-                PhysicalTableMetricGroup physicalTableMetricGroup, TabletType 
groupType) {
-            super(physicalTableMetricGroup, groupType);
+        private LogMetricGroup(TableMetricGroup tableMetricGroup, TabletType 
groupType) {
+            super(tableMetricGroup, groupType);
             // for fetch log requests
             totalFetchLogRequests = new ThreadSafeSimpleCounter();
             meter(MetricNames.TOTAL_FETCH_LOG_REQUESTS_RATE, new 
MeterView(totalFetchLogRequests));
@@ -350,8 +356,8 @@ public class PhysicalTableMetricGroup extends 
AbstractMetricGroup {
         private final Counter totalPrefixLookupRequests;
         private final Counter failedPrefixLookupRequests;
 
-        public KvMetricGroup(PhysicalTableMetricGroup 
physicalTableMetricGroup) {
-            super(physicalTableMetricGroup, TabletType.KV);
+        public KvMetricGroup(TableMetricGroup tableMetricGroup) {
+            super(tableMetricGroup, TabletType.KV);
 
             // for lookup request
             totalLookupRequests = new ThreadSafeSimpleCounter();
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java
index ce20e533f..cb1b75ff3 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/TabletServerMetricGroup.java
@@ -18,12 +18,18 @@
 package org.apache.fluss.server.metrics.group;
 
 import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.metrics.CharacterFilter;
 import org.apache.fluss.metrics.Counter;
+import org.apache.fluss.metrics.DescriptiveStatisticsHistogram;
+import org.apache.fluss.metrics.Histogram;
 import org.apache.fluss.metrics.MeterView;
 import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.metrics.SimpleCounter;
 import org.apache.fluss.metrics.ThreadSafeSimpleCounter;
 import org.apache.fluss.metrics.groups.AbstractMetricGroup;
+import org.apache.fluss.metrics.groups.MetricGroup;
 import org.apache.fluss.metrics.registry.MetricRegistry;
 import org.apache.fluss.utils.MapUtils;
 
@@ -33,8 +39,9 @@ import java.util.Map;
 public class TabletServerMetricGroup extends AbstractMetricGroup {
 
     private static final String NAME = "tabletserver";
+    private static final int WINDOW_SIZE = 1024;
 
-    private final Map<PhysicalTablePath, PhysicalTableMetricGroup> 
metricGroupByPhysicalTable =
+    private final Map<TablePath, TableMetricGroup> metricGroupByTable =
             MapUtils.newConcurrentHashMap();
 
     protected final String clusterId;
@@ -49,6 +56,26 @@ public class TabletServerMetricGroup extends 
AbstractMetricGroup {
     private final Counter delayedFetchFromFollowerExpireCount;
     private final Counter delayedFetchFromClientExpireCount;
 
+    // aggregated metrics
+    private final Counter messagesIn;
+    private final Counter bytesIn;
+    private final Counter bytesOut;
+
+    // aggregated log metrics
+    private final Counter logFlushCount;
+    private final Histogram logFlushLatencyHistogram;
+
+    // aggregated kv metrics
+    private final Counter kvFlushCount;
+    private final Histogram kvFlushLatencyHistogram;
+    private final Counter kvTruncateAsDuplicatedCount;
+    private final Counter kvTruncateAsErrorCount;
+
+    // aggregated replica metrics
+    private final Counter isrShrinks;
+    private final Counter isrExpands;
+    private final Counter failedIsrUpdates;
+
     public TabletServerMetricGroup(
             MetricRegistry registry, String clusterId, String rack, String 
hostname, int serverId) {
         super(registry, new String[] {clusterId, hostname, NAME}, null);
@@ -72,6 +99,42 @@ public class TabletServerMetricGroup extends 
AbstractMetricGroup {
         meter(
                 MetricNames.DELAYED_FETCH_FROM_CLIENT_EXPIRES_RATE,
                 new MeterView(delayedFetchFromClientExpireCount));
+
+        messagesIn = new ThreadSafeSimpleCounter();
+        meter(MetricNames.MESSAGES_IN_RATE, new MeterView(messagesIn));
+        bytesIn = new ThreadSafeSimpleCounter();
+        meter(MetricNames.BYTES_IN_RATE, new MeterView(bytesIn));
+        bytesOut = new ThreadSafeSimpleCounter();
+        meter(MetricNames.BYTES_OUT_RATE, new MeterView(bytesOut));
+
+        MetricGroup logMetricGroup = this.addGroup("log");
+        // about flush
+        logFlushCount = new SimpleCounter();
+        logMetricGroup.meter(MetricNames.LOG_FLUSH_RATE, new 
MeterView(logFlushCount));
+        logFlushLatencyHistogram = new 
DescriptiveStatisticsHistogram(WINDOW_SIZE);
+        logMetricGroup.histogram(MetricNames.LOG_FLUSH_LATENCY_MS, 
logFlushLatencyHistogram);
+
+        // about pre-write buffer.
+        kvFlushCount = new SimpleCounter();
+        meter(MetricNames.KV_FLUSH_RATE, new MeterView(kvFlushCount));
+        kvFlushLatencyHistogram = new 
DescriptiveStatisticsHistogram(WINDOW_SIZE);
+        histogram(MetricNames.KV_FLUSH_LATENCY_MS, kvFlushLatencyHistogram);
+        kvTruncateAsDuplicatedCount = new SimpleCounter();
+        meter(
+                MetricNames.KV_PRE_WRITE_BUFFER_TRUNCATE_AS_DUPLICATED_RATE,
+                new MeterView(kvTruncateAsDuplicatedCount));
+        kvTruncateAsErrorCount = new SimpleCounter();
+        meter(
+                MetricNames.KV_PRE_WRITE_BUFFER_TRUNCATE_AS_ERROR_RATE,
+                new MeterView(kvTruncateAsErrorCount));
+
+        // replica metrics
+        isrExpands = new SimpleCounter();
+        meter(MetricNames.ISR_EXPANDS_RATE, new MeterView(isrExpands));
+        isrShrinks = new SimpleCounter();
+        meter(MetricNames.ISR_SHRINKS_RATE, new MeterView(isrShrinks));
+        failedIsrUpdates = new SimpleCounter();
+        meter(MetricNames.FAILED_ISR_UPDATES_RATE, new 
MeterView(failedIsrUpdates));
     }
 
     @Override
@@ -112,33 +175,79 @@ public class TabletServerMetricGroup extends 
AbstractMetricGroup {
         return delayedFetchFromClientExpireCount;
     }
 
+    public Counter messageIn() {
+        return messagesIn;
+    }
+
+    public Counter bytesIn() {
+        return bytesIn;
+    }
+
+    public Counter bytesOut() {
+        return bytesOut;
+    }
+
+    public Counter logFlushCount() {
+        return logFlushCount;
+    }
+
+    public Histogram logFlushLatencyHistogram() {
+        return logFlushLatencyHistogram;
+    }
+
+    public Counter kvFlushCount() {
+        return kvFlushCount;
+    }
+
+    public Histogram kvFlushLatencyHistogram() {
+        return kvFlushLatencyHistogram;
+    }
+
+    public Counter kvTruncateAsDuplicatedCount() {
+        return kvTruncateAsDuplicatedCount;
+    }
+
+    public Counter kvTruncateAsErrorCount() {
+        return kvTruncateAsErrorCount;
+    }
+
+    public Counter isrShrinks() {
+        return isrShrinks;
+    }
+
+    public Counter isrExpands() {
+        return isrExpands;
+    }
+
+    public Counter failedIsrUpdates() {
+        return failedIsrUpdates;
+    }
+
     // ------------------------------------------------------------------------
     //  table buckets groups
     // ------------------------------------------------------------------------
-    public BucketMetricGroup addPhysicalTableBucketMetricGroup(
-            PhysicalTablePath physicalTablePath, int bucket, boolean 
isKvTable) {
-        PhysicalTableMetricGroup physicalTableMetricGroup =
-                metricGroupByPhysicalTable.computeIfAbsent(
-                        physicalTablePath,
-                        table ->
-                                new PhysicalTableMetricGroup(
-                                        registry, physicalTablePath, 
isKvTable, this));
-        return physicalTableMetricGroup.addBucketMetricGroup(bucket);
-    }
-
-    public void removeTableBucketMetricGroup(PhysicalTablePath 
physicalTablePath, int bucket) {
-        // get the metric group of the physical table
-        PhysicalTableMetricGroup physicalTableMetricGroup =
-                metricGroupByPhysicalTable.get(physicalTablePath);
-        // if get the physical table metric group
-        if (physicalTableMetricGroup != null) {
+    public BucketMetricGroup addTableBucketMetricGroup(
+            PhysicalTablePath physicalTablePath, TableBucket bucket, boolean 
isKvTable) {
+        TablePath tablePath = physicalTablePath.getTablePath();
+        TableMetricGroup tableMetricGroup =
+                metricGroupByTable.computeIfAbsent(
+                        tablePath,
+                        table -> new TableMetricGroup(registry, tablePath, 
isKvTable, this));
+        return 
tableMetricGroup.addBucketMetricGroup(physicalTablePath.getPartitionName(), 
bucket);
+    }
+
+    public void removeTableBucketMetricGroup(TablePath tablePath, TableBucket 
bucket) {
+        // get the metric group of the table
+        TableMetricGroup tableMetricGroup = metricGroupByTable.get(tablePath);
+        // if get the table metric group
+        if (tableMetricGroup != null) {
             // remove the bucket metric group
-            physicalTableMetricGroup.removeBucketMetricGroup(bucket);
+            tableMetricGroup.removeBucketMetricGroup(bucket);
             // if no any bucket groups remain in the physical table metrics 
group,
             // close and remove the physical table metric group
-            if (physicalTableMetricGroup.bucketGroupsCount() == 0) {
-                physicalTableMetricGroup.close();
-                metricGroupByPhysicalTable.remove(physicalTablePath);
+            if (tableMetricGroup.bucketGroupsCount() == 0) {
+                tableMetricGroup.close();
+                metricGroupByTable.remove(tablePath);
             }
         }
     }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
index e86a29b1d..feb657201 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
@@ -38,9 +38,6 @@ import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.metrics.Counter;
-import org.apache.fluss.metrics.MeterView;
-import org.apache.fluss.metrics.MetricNames;
-import org.apache.fluss.metrics.SimpleCounter;
 import org.apache.fluss.record.DefaultValueRecordBatch;
 import org.apache.fluss.record.KvRecordBatch;
 import org.apache.fluss.record.LogRecords;
@@ -76,7 +73,8 @@ import 
org.apache.fluss.server.log.checkpoint.OffsetCheckpointFile;
 import org.apache.fluss.server.log.remote.RemoteLogManager;
 import org.apache.fluss.server.metadata.ServerMetadataCache;
 import org.apache.fluss.server.metrics.group.BucketMetricGroup;
-import org.apache.fluss.server.metrics.group.PhysicalTableMetricGroup;
+import org.apache.fluss.server.metrics.group.TableMetricGroup;
+import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
 import org.apache.fluss.server.replica.delay.DelayedFetchLog;
 import org.apache.fluss.server.replica.delay.DelayedOperationManager;
 import org.apache.fluss.server.replica.delay.DelayedTableBucketKey;
@@ -246,19 +244,12 @@ public final class Replica {
     }
 
     private void registerMetrics() {
-        bucketMetricGroup.gauge(MetricNames.UNDER_REPLICATED, () -> 
isUnderReplicated() ? 1 : 0);
-        bucketMetricGroup.gauge(
-                MetricNames.IN_SYNC_REPLICAS, () -> isLeader() ? 
isrState.isr().size() : 0);
-        bucketMetricGroup.gauge(MetricNames.UNDER_MIN_ISR, () -> 
isUnderMinIsr() ? 1 : 0);
-        bucketMetricGroup.gauge(MetricNames.AT_MIN_ISR, () -> isAtMinIsr() ? 1 
: 0);
-
-        isrExpands = new SimpleCounter();
-        bucketMetricGroup.meter(MetricNames.ISR_EXPANDS_RATE, new 
MeterView(isrExpands));
-        isrShrinks = new SimpleCounter();
-        bucketMetricGroup.meter(MetricNames.ISR_SHRINKS_RATE, new 
MeterView(isrShrinks));
-        failedIsrUpdates = new SimpleCounter();
-        bucketMetricGroup.meter(
-                MetricNames.FAILED_ISR_UPDATES_RATE, new 
MeterView(failedIsrUpdates));
+        // get root metric in the reference: bucket -> table -> tabletServer
+        TabletServerMetricGroup serverMetrics =
+                bucketMetricGroup.getTableMetricGroup().getServerMetricGroup();
+        isrExpands = serverMetrics.isrExpands();
+        isrShrinks = serverMetrics.isrShrinks();
+        failedIsrUpdates = serverMetrics.failedIsrUpdates();
     }
 
     public boolean isKvTable() {
@@ -349,8 +340,8 @@ public final class Replica {
         return bucketMetricGroup;
     }
 
-    public PhysicalTableMetricGroup tableMetrics() {
-        return bucketMetricGroup.getPhysicalTableMetricGroup();
+    public TableMetricGroup tableMetrics() {
+        return bucketMetricGroup.getTableMetricGroup();
     }
 
     public void makeLeader(NotifyLeaderAndIsrData data) throws IOException {
@@ -637,8 +628,6 @@ public final class Replica {
                                 arrowCompressionInfo);
             }
 
-            kvTablet.registerMetrics(bucketMetricGroup);
-
             logTablet.updateMinRetainOffset(restoreStartOffset);
             recoverKvTablet(restoreStartOffset);
         } catch (Exception e) {
@@ -1749,12 +1738,12 @@ public final class Replica {
         }
     }
 
-    private boolean isUnderReplicated() {
+    public boolean isUnderReplicated() {
         // is leader and isr size less than numReplicas
         return isLeader() && isrState.isr().size() < 
tableConfig.getReplicationFactor();
     }
 
-    private boolean isUnderMinIsr() {
+    public boolean isUnderMinIsr() {
         return isLeader() && isrState.isr().size() < minInSyncReplicas;
     }
 
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
index 26b31bdd5..692bd4c2a 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
@@ -81,7 +81,7 @@ import org.apache.fluss.server.log.remote.RemoteLogManager;
 import org.apache.fluss.server.metadata.ClusterMetadata;
 import org.apache.fluss.server.metadata.TabletServerMetadataCache;
 import org.apache.fluss.server.metrics.group.BucketMetricGroup;
-import org.apache.fluss.server.metrics.group.PhysicalTableMetricGroup;
+import org.apache.fluss.server.metrics.group.TableMetricGroup;
 import org.apache.fluss.server.metrics.group.TabletServerMetricGroup;
 import org.apache.fluss.server.replica.delay.DelayedFetchLog;
 import org.apache.fluss.server.replica.delay.DelayedFetchLog.FetchBucketStatus;
@@ -302,6 +302,10 @@ public class ReplicaManager {
         serverMetricGroup.gauge(MetricNames.DELAYED_WRITE_COUNT, 
delayedWriteManager::numDelayed);
         serverMetricGroup.gauge(
                 MetricNames.DELAYED_FETCH_COUNT, 
delayedFetchLogManager::numDelayed);
+
+        serverMetricGroup.gauge(MetricNames.UNDER_REPLICATED, 
this::underReplicatedCount);
+        serverMetricGroup.gauge(MetricNames.UNDER_MIN_ISR, 
this::underMinIsrCount);
+        serverMetricGroup.gauge(MetricNames.AT_MIN_ISR, this::atMinIsrCount);
     }
 
     private Stream<Replica> onlineReplicas() {
@@ -318,6 +322,18 @@ public class ReplicaManager {
                 .map(t -> (Replica) t.get());
     }
 
+    private long underReplicatedCount() {
+        return onlineReplicas().filter(Replica::isUnderReplicated).count();
+    }
+
+    private long underMinIsrCount() {
+        return onlineReplicas().filter(Replica::isUnderMinIsr).count();
+    }
+
+    private long atMinIsrCount() {
+        return onlineReplicas().filter(Replica::isAtMinIsr).count();
+    }
+
     private int writerIdCount() {
         return onlineReplicas().map(Replica::writerIdCount).reduce(0, 
Integer::sum);
     }
@@ -478,7 +494,7 @@ public class ReplicaManager {
             Consumer<Map<TableBucket, LookupResultForBucket>> 
responseCallback) {
         Map<TableBucket, LookupResultForBucket> lookupResultForBucketMap = new 
HashMap<>();
         long startTime = System.currentTimeMillis();
-        PhysicalTableMetricGroup tableMetrics = null;
+        TableMetricGroup tableMetrics = null;
         for (Map.Entry<TableBucket, List<byte[]>> entry : 
entriesPerBucket.entrySet()) {
             TableBucket tb = entry.getKey();
             try {
@@ -509,7 +525,7 @@ public class ReplicaManager {
     public void prefixLookups(
             Map<TableBucket, List<byte[]>> entriesPerBucket,
             Consumer<Map<TableBucket, PrefixLookupResultForBucket>> 
responseCallback) {
-        PhysicalTableMetricGroup tableMetrics = null;
+        TableMetricGroup tableMetrics = null;
         Map<TableBucket, PrefixLookupResultForBucket> result = new HashMap<>();
         for (Map.Entry<TableBucket, List<byte[]>> entry : 
entriesPerBucket.entrySet()) {
             TableBucket tb = entry.getKey();
@@ -862,7 +878,7 @@ public class ReplicaManager {
         Map<TableBucket, ProduceLogResultForBucket> resultForBucketMap = new 
HashMap<>();
         for (Map.Entry<TableBucket, MemoryLogRecords> entry : 
entriesPerBucket.entrySet()) {
             TableBucket tb = entry.getKey();
-            PhysicalTableMetricGroup tableMetrics = null;
+            TableMetricGroup tableMetrics = null;
             try {
                 Replica replica = getReplicaOrException(tb);
                 tableMetrics = replica.tableMetrics();
@@ -881,8 +897,8 @@ public class ReplicaManager {
                 resultForBucketMap.put(
                         tb,
                         new ProduceLogResultForBucket(tb, baseOffset, 
appendInfo.lastOffset() + 1));
-                tableMetrics.logBytesIn().inc(appendInfo.validBytes());
-                tableMetrics.logMessageIn().inc(appendInfo.numMessages());
+                tableMetrics.incLogBytesIn(appendInfo.validBytes());
+                tableMetrics.incLogMessageIn(appendInfo.numMessages());
             } catch (Exception e) {
                 if (isUnexpectedException(e)) {
                     LOG.error("Error append records to local log on replica 
{}", tb, e);
@@ -908,7 +924,7 @@ public class ReplicaManager {
         Map<TableBucket, PutKvResultForBucket> putResultForBucketMap = new 
HashMap<>();
         for (Map.Entry<TableBucket, KvRecordBatch> entry : 
entriesPerBucket.entrySet()) {
             TableBucket tb = entry.getKey();
-            PhysicalTableMetricGroup tableMetrics = null;
+            TableMetricGroup tableMetrics = null;
             try {
                 LOG.trace("Put records to local kv tablet for table bucket 
{}", tb);
                 Replica replica = getReplicaOrException(tb);
@@ -925,11 +941,11 @@ public class ReplicaManager {
                         tb, new PutKvResultForBucket(tb, 
appendInfo.lastOffset() + 1));
 
                 // metric for kv
-                
tableMetrics.kvMessageIn().inc(entry.getValue().getRecordCount());
-                tableMetrics.kvBytesIn().inc(entry.getValue().sizeInBytes());
+                tableMetrics.incKvMessageIn(entry.getValue().getRecordCount());
+                tableMetrics.incKvBytesIn(entry.getValue().sizeInBytes());
                 // metric for cdc log of kv
-                tableMetrics.logBytesIn().inc(appendInfo.validBytes());
-                tableMetrics.logMessageIn().inc(appendInfo.numMessages());
+                tableMetrics.incLogBytesIn(appendInfo.validBytes());
+                tableMetrics.incLogMessageIn(appendInfo.numMessages());
             } catch (Exception e) {
                 if (isUnexpectedException(e)) {
                     LOG.error("Error put records to local kv on replica {}", 
tb, e);
@@ -953,7 +969,7 @@ public class ReplicaManager {
             int limit,
             Consumer<LimitScanResultForBucket> responseCallback) {
         LimitScanResultForBucket limitScanResultForBucket;
-        PhysicalTableMetricGroup tableMetrics = null;
+        TableMetricGroup tableMetrics = null;
         try {
             Replica replica = getReplicaOrException(tableBucket);
             tableMetrics = replica.tableMetrics();
@@ -988,7 +1004,7 @@ public class ReplicaManager {
         int limitBytes = fetchParams.maxFetchBytes();
         for (Map.Entry<TableBucket, FetchReqInfo> entry : 
bucketFetchInfo.entrySet()) {
             TableBucket tb = entry.getKey();
-            PhysicalTableMetricGroup tableMetrics = null;
+            TableMetricGroup tableMetrics = null;
             Replica replica = null;
             FetchReqInfo fetchReqInfo = entry.getValue();
             long fetchOffset = fetchReqInfo.getFetchOffset();
@@ -1031,7 +1047,7 @@ public class ReplicaManager {
                 if (isFromFollower) {
                     
serverMetricGroup.replicationBytesOut().inc(recordBatchSize);
                 } else {
-                    tableMetrics.logBytesOut().inc(recordBatchSize);
+                    tableMetrics.incLogBytesOut(recordBatchSize);
                 }
             } catch (Exception e) {
                 if (isUnexpectedException(e)) {
@@ -1382,7 +1398,7 @@ public class ReplicaManager {
             if (delete) {
                 if (allReplicas.remove(tb) != null) {
                     serverMetricGroup.removeTableBucketMetricGroup(
-                            replicaToDelete.getPhysicalTablePath(), 
tb.getBucket());
+                            
replicaToDelete.getPhysicalTablePath().getTablePath(), tb);
                     replicaToDelete.delete();
                     Path tabletParentDir = 
replicaToDelete.getTabletParentDir();
                     if (tb.getPartitionId() != null) {
@@ -1459,8 +1475,8 @@ public class ReplicaManager {
                 TableInfo tableInfo = getTableInfo(zkClient, tablePath);
                 boolean isKvTable = tableInfo.hasPrimaryKey();
                 BucketMetricGroup bucketMetricGroup =
-                        serverMetricGroup.addPhysicalTableBucketMetricGroup(
-                                physicalTablePath, tb.getBucket(), isKvTable);
+                        serverMetricGroup.addTableBucketMetricGroup(
+                                physicalTablePath, tb, isKvTable);
                 Replica replica =
                         new Replica(
                                 physicalTablePath,
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java 
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
index 9f8db8000..0e593c2c8 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
@@ -189,10 +189,11 @@ public class TabletServer extends ServerBase {
             this.scheduler = new FlussScheduler(conf.get(BACKGROUND_THREADS));
             scheduler.startup();
 
-            this.logManager = LogManager.create(conf, zkClient, scheduler, 
clock);
+            this.logManager =
+                    LogManager.create(conf, zkClient, scheduler, clock, 
tabletServerMetricGroup);
             logManager.startup();
 
-            this.kvManager = KvManager.create(conf, zkClient, logManager);
+            this.kvManager = KvManager.create(conf, zkClient, logManager, 
tabletServerMetricGroup);
             kvManager.startup();
 
             this.authorizer = AuthorizerLoader.createAuthorizer(conf, 
zkClient, pluginManager);
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvManagerTest.java 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvManagerTest.java
index 714adf545..7627e0ba3 100644
--- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvManagerTest.java
+++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvManagerTest.java
@@ -32,6 +32,7 @@ import org.apache.fluss.record.TestData;
 import org.apache.fluss.row.encode.ValueEncoder;
 import org.apache.fluss.server.log.LogManager;
 import org.apache.fluss.server.log.LogTablet;
+import org.apache.fluss.server.metrics.group.TestingMetricGroups;
 import org.apache.fluss.server.zk.NOPErrorHandler;
 import org.apache.fluss.server.zk.ZooKeeperClient;
 import org.apache.fluss.server.zk.ZooKeeperExtension;
@@ -110,8 +111,15 @@ final class KvManagerTest {
         // we need a log manager for kv manager
 
         logManager =
-                LogManager.create(conf, zkClient, new FlussScheduler(1), 
SystemClock.getInstance());
-        kvManager = KvManager.create(conf, zkClient, logManager);
+                LogManager.create(
+                        conf,
+                        zkClient,
+                        new FlussScheduler(1),
+                        SystemClock.getInstance(),
+                        TestingMetricGroups.TABLET_SERVER_METRICS);
+        kvManager =
+                KvManager.create(
+                        conf, zkClient, logManager, 
TestingMetricGroups.TABLET_SERVER_METRICS);
         kvManager.startup();
     }
 
@@ -171,7 +179,9 @@ final class KvManagerTest {
 
         // restart
         kvManager.shutdown();
-        kvManager = KvManager.create(conf, zkClient, logManager);
+        kvManager =
+                KvManager.create(
+                        conf, zkClient, logManager, 
TestingMetricGroups.TABLET_SERVER_METRICS);
         kvManager.startup();
         kv1 = getOrCreateKv(tablePath1, partitionName, tableBucket1);
         kv2 = getOrCreateKv(tablePath2, partitionName, tableBucket2);
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
index 6734a32d5..b9d9df99b 100644
--- a/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
+++ b/fluss-server/src/test/java/org/apache/fluss/server/kv/KvTabletTest.java
@@ -48,6 +48,7 @@ import org.apache.fluss.server.log.FetchIsolation;
 import org.apache.fluss.server.log.LogAppendInfo;
 import org.apache.fluss.server.log.LogTablet;
 import org.apache.fluss.server.log.LogTestUtils;
+import org.apache.fluss.server.metrics.group.TestingMetricGroups;
 import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator;
 import org.apache.fluss.types.DataTypes;
 import org.apache.fluss.types.RowType;
@@ -145,6 +146,7 @@ class KvTabletTest {
                 tablePath,
                 logTabletDir,
                 conf,
+                TestingMetricGroups.TABLET_SERVER_METRICS,
                 0,
                 new FlussScheduler(1),
                 LogFormat.ARROW,
@@ -173,6 +175,7 @@ class KvTabletTest {
                 logTablet,
                 tmpKvDir,
                 conf,
+                TestingMetricGroups.TABLET_SERVER_METRICS,
                 new RootAllocator(Long.MAX_VALUE),
                 new TestingMemorySegmentPool(10 * 1024),
                 KvFormat.COMPACTED,
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBufferTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBufferTest.java
index 9c6418a4d..b39ec39ac 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBufferTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/prewrite/KvPreWriteBufferTest.java
@@ -19,6 +19,7 @@ package org.apache.fluss.server.kv.prewrite;
 
 import org.apache.fluss.server.kv.KvBatchWriter;
 import org.apache.fluss.server.kv.prewrite.KvPreWriteBuffer.TruncateReason;
+import org.apache.fluss.server.metrics.group.TestingMetricGroups;
 
 import org.junit.jupiter.api.Test;
 
@@ -32,7 +33,9 @@ class KvPreWriteBufferTest {
 
     @Test
     void testIllegalLSN() {
-        KvPreWriteBuffer buffer = new KvPreWriteBuffer(new NopKvBatchWriter());
+        KvPreWriteBuffer buffer =
+                new KvPreWriteBuffer(
+                        new NopKvBatchWriter(), 
TestingMetricGroups.TABLET_SERVER_METRICS);
         bufferPut(buffer, "key1", "value1", 1);
         bufferDelete(buffer, "key1", 3);
 
@@ -51,7 +54,9 @@ class KvPreWriteBufferTest {
 
     @Test
     void testWriteAndFlush() throws Exception {
-        KvPreWriteBuffer buffer = new KvPreWriteBuffer(new NopKvBatchWriter());
+        KvPreWriteBuffer buffer =
+                new KvPreWriteBuffer(
+                        new NopKvBatchWriter(), 
TestingMetricGroups.TABLET_SERVER_METRICS);
         int elementCount = 0;
 
         // put a series of kv entries
@@ -132,7 +137,9 @@ class KvPreWriteBufferTest {
 
     @Test
     void testTruncate() {
-        KvPreWriteBuffer buffer = new KvPreWriteBuffer(new NopKvBatchWriter());
+        KvPreWriteBuffer buffer =
+                new KvPreWriteBuffer(
+                        new NopKvBatchWriter(), 
TestingMetricGroups.TABLET_SERVER_METRICS);
         int elementCount = 0;
 
         // put a series of kv entries
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/log/LocalLogTest.java 
b/fluss-server/src/test/java/org/apache/fluss/server/log/LocalLogTest.java
index 37bf57b4f..ad3d0c6ca 100644
--- a/fluss-server/src/test/java/org/apache/fluss/server/log/LocalLogTest.java
+++ b/fluss-server/src/test/java/org/apache/fluss/server/log/LocalLogTest.java
@@ -24,6 +24,7 @@ import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.record.LogTestBase;
 import org.apache.fluss.record.MemoryLogRecords;
 import org.apache.fluss.server.log.LocalLog.SegmentDeletionReason;
+import org.apache.fluss.server.metrics.group.TestingMetricGroups;
 
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -408,6 +409,7 @@ final class LocalLogTest extends LogTestBase {
         return new LocalLog(
                 dir,
                 logConf,
+                TestingMetricGroups.TABLET_SERVER_METRICS,
                 segments,
                 recoverPoint,
                 nextOffsetMetadata,
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/log/LogManagerTest.java 
b/fluss-server/src/test/java/org/apache/fluss/server/log/LogManagerTest.java
index 5ae6b0f22..2bbdd2a1c 100644
--- a/fluss-server/src/test/java/org/apache/fluss/server/log/LogManagerTest.java
+++ b/fluss-server/src/test/java/org/apache/fluss/server/log/LogManagerTest.java
@@ -25,6 +25,7 @@ import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.record.LogTestBase;
 import org.apache.fluss.record.MemoryLogRecords;
 import org.apache.fluss.server.log.checkpoint.OffsetCheckpointFile;
+import org.apache.fluss.server.metrics.group.TestingMetricGroups;
 import org.apache.fluss.server.zk.NOPErrorHandler;
 import org.apache.fluss.server.zk.ZooKeeperClient;
 import org.apache.fluss.server.zk.ZooKeeperExtension;
@@ -100,7 +101,12 @@ final class LogManagerTest extends LogTestBase {
 
         registerTableInZkClient();
         logManager =
-                LogManager.create(conf, zkClient, new FlussScheduler(1), 
SystemClock.getInstance());
+                LogManager.create(
+                        conf,
+                        zkClient,
+                        new FlussScheduler(1),
+                        SystemClock.getInstance(),
+                        TestingMetricGroups.TABLET_SERVER_METRICS);
         logManager.startup();
     }
 
@@ -194,7 +200,12 @@ final class LogManagerTest extends LogTestBase {
         logManager = null;
 
         LogManager newLogManager =
-                LogManager.create(conf, zkClient, new FlussScheduler(1), 
SystemClock.getInstance());
+                LogManager.create(
+                        conf,
+                        zkClient,
+                        new FlussScheduler(1),
+                        SystemClock.getInstance(),
+                        TestingMetricGroups.TABLET_SERVER_METRICS);
         newLogManager.startup();
         logManager = newLogManager;
         log1 = getOrCreateLog(tablePath1, null, tableBucket1);
@@ -234,7 +245,12 @@ final class LogManagerTest extends LogTestBase {
         assertThat(new File(dataDir, CLEAN_SHUTDOWN_FILE).exists()).isTrue();
 
         LogManager newLogManager =
-                LogManager.create(conf, zkClient, new FlussScheduler(1), 
SystemClock.getInstance());
+                LogManager.create(
+                        conf,
+                        zkClient,
+                        new FlussScheduler(1),
+                        SystemClock.getInstance(),
+                        TestingMetricGroups.TABLET_SERVER_METRICS);
         assertThat(new File(dataDir, CLEAN_SHUTDOWN_FILE).exists()).isTrue();
         newLogManager.startup();
         logManager = newLogManager;
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/log/LogTabletTest.java 
b/fluss-server/src/test/java/org/apache/fluss/server/log/LogTabletTest.java
index 468187ded..695dab198 100644
--- a/fluss-server/src/test/java/org/apache/fluss/server/log/LogTabletTest.java
+++ b/fluss-server/src/test/java/org/apache/fluss/server/log/LogTabletTest.java
@@ -28,6 +28,7 @@ import org.apache.fluss.record.LogRecordBatch;
 import org.apache.fluss.record.LogRecordReadContext;
 import org.apache.fluss.record.LogTestBase;
 import org.apache.fluss.record.MemoryLogRecords;
+import org.apache.fluss.server.metrics.group.TestingMetricGroups;
 import org.apache.fluss.types.RowType;
 import org.apache.fluss.utils.CloseableIterator;
 import org.apache.fluss.utils.clock.SystemClock;
@@ -89,6 +90,7 @@ final class LogTabletTest extends LogTestBase {
                         PhysicalTablePath.of(DATA1_TABLE_PATH),
                         logDir,
                         conf,
+                        TestingMetricGroups.TABLET_SERVER_METRICS,
                         0,
                         scheduler,
                         LogFormat.ARROW,
@@ -491,6 +493,7 @@ final class LogTabletTest extends LogTestBase {
                 PhysicalTablePath.of(DATA1_TABLE_PATH),
                 logDir,
                 config,
+                TestingMetricGroups.TABLET_SERVER_METRICS,
                 0,
                 scheduler,
                 LogFormat.ARROW,
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/metrics/group/TestingMetricGroups.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/metrics/group/TestingMetricGroups.java
index d3e0a5726..15d1316b5 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/metrics/group/TestingMetricGroups.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/metrics/group/TestingMetricGroups.java
@@ -17,7 +17,6 @@
 
 package org.apache.fluss.server.metrics.group;
 
-import org.apache.fluss.metadata.PhysicalTablePath;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.metrics.registry.NOPMetricRegistry;
 
@@ -30,13 +29,13 @@ public class TestingMetricGroups {
     public static final CoordinatorMetricGroup COORDINATOR_METRICS =
             new CoordinatorMetricGroup(NOPMetricRegistry.INSTANCE, "cluster1", 
"host", "0");
 
-    public static final PhysicalTableMetricGroup TABLE_METRICS =
-            new PhysicalTableMetricGroup(
+    public static final TableMetricGroup TABLE_METRICS =
+            new TableMetricGroup(
                     NOPMetricRegistry.INSTANCE,
-                    PhysicalTablePath.of(TablePath.of("mydb", "mytable"), 
null),
+                    TablePath.of("mydb", "mytable"),
                     false,
                     TABLET_SERVER_METRICS);
 
     public static final BucketMetricGroup BUCKET_METRICS =
-            new BucketMetricGroup(NOPMetricRegistry.INSTANCE, 0, 
TABLE_METRICS);
+            new BucketMetricGroup(NOPMetricRegistry.INSTANCE, null, 0, 
TABLE_METRICS);
 }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
index 8f0b271bb..36862c4e5 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
@@ -178,10 +178,18 @@ public class ReplicaTestBase {
         scheduler.startup();
 
         manualClock = new ManualClock(System.currentTimeMillis());
-        logManager = LogManager.create(conf, zkClient, scheduler, manualClock);
+        logManager =
+                LogManager.create(
+                        conf,
+                        zkClient,
+                        scheduler,
+                        manualClock,
+                        TestingMetricGroups.TABLET_SERVER_METRICS);
         logManager.startup();
 
-        kvManager = KvManager.create(conf, zkClient, logManager);
+        kvManager =
+                KvManager.create(
+                        conf, zkClient, logManager, 
TestingMetricGroups.TABLET_SERVER_METRICS);
         kvManager.startup();
 
         serverMetadataCache =
@@ -436,8 +444,7 @@ public class ReplicaTestBase {
         BucketMetricGroup metricGroup =
                 replicaManager
                         .getServerMetricGroup()
-                        .addPhysicalTableBucketMetricGroup(
-                                physicalTablePath, tableBucket.getBucket(), 
isPkTable);
+                        .addTableBucketMetricGroup(physicalTablePath, 
tableBucket, isPkTable);
         return new Replica(
                 physicalTablePath,
                 tableBucket,
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
index 777999f7e..a982ff058 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java
@@ -323,7 +323,12 @@ public class ReplicaFetcherThreadTest {
         scheduler.startup();
 
         LogManager logManager =
-                LogManager.create(conf, zkClient, scheduler, 
SystemClock.getInstance());
+                LogManager.create(
+                        conf,
+                        zkClient,
+                        scheduler,
+                        SystemClock.getInstance(),
+                        TestingMetricGroups.TABLET_SERVER_METRICS);
         logManager.startup();
         ReplicaManager replicaManager =
                 new TestingReplicaManager(
diff --git a/website/docs/maintenance/observability/monitor-metrics.md 
b/website/docs/maintenance/observability/monitor-metrics.md
index 0aa996364..ac351d510 100644
--- a/website/docs/maintenance/observability/monitor-metrics.md
+++ b/website/docs/maintenance/observability/monitor-metrics.md
@@ -357,8 +357,23 @@ Some metrics might not be exposed when using other JVM 
implementations (e.g. IBM
   </thead>
   <tbody>
     <tr>
-      <th rowspan="10"><strong>tabletserver</strong></th>
-      <td style={{textAlign: 'center', verticalAlign: 'middle' }} 
rowspan="10">-</td>
+      <th rowspan="25"><strong>tabletserver</strong></th>
+      <td style={{textAlign: 'center', verticalAlign: 'middle' }} 
rowspan="25">-</td>
+      <td>messagesInPerSecond</td>
+      <td>The number of messages written per second to this server.</td>
+      <td>Meter</td>
+    </tr>
+     <tr>
+      <td>bytesInPerSecond</td>
+      <td>The number of bytes written per second to this server.</td>
+      <td>Meter</td>
+    </tr>
+    <tr>
+      <td>bytesOutPerSecond</td>
+      <td>The number of bytes read per second from this server.</td>
+      <td>Meter</td>
+    </tr>
+    <tr>
       <td>replicationBytesInPerSecond</td>
       <td>The bytes of data write into follower replica for data sync.</td>
       <td>Meter</td>
@@ -408,6 +423,66 @@ Some metrics might not be exposed when using other JVM 
implementations (e.g. IBM
       <td>The delayed fetch log operation from client expire count per second 
in this TabletServer.</td>
       <td>Meter</td>
     </tr>
+    <tr>
+      <td>underMinIsr</td>
+      <td>The count of buckets who is under min isr in this server.</td>
+      <td>Gauge</td>
+    </tr>
+     <tr>
+      <td>underReplicated</td>
+      <td>The count of buckets who is under replication factor in this 
server.</td>
+      <td>Gauge</td>
+    </tr>
+     <tr>
+      <td>atMinIsr</td>
+      <td>The count of buckets who is at min isr in this server.</td>
+      <td>Gauge</td>
+    </tr>
+     <tr>
+      <td>isrExpandsPerSecond</td>
+      <td>The number of isr expands per second.</td>
+      <td>Meter</td>
+    </tr>
+     <tr>
+      <td>isrShrinksPerSecond</td>
+      <td>The number of isr shrinks per second.</td>
+      <td>Meter</td>
+    </tr>
+     <tr>
+      <td>failedIsrUpdatesPerSecond</td>
+      <td>The failed isr updates per second.</td>
+      <td>Meter</td>
+    </tr>
+     <tr>
+      <td>logFlushPerSecond</td>
+      <td>The log flush count per second.</td>
+      <td>Meter</td>
+    </tr>
+     <tr>
+      <td>logFlushLatencyMs</td>
+      <td>The log flush latency in ms.</td>
+      <td>Histogram</td>
+    </tr>
+    <tr>
+      <td>kvFlushPerSecond</td>
+      <td>The kv pre-write buffer flush to underlying RocksDB count per 
second.</td>
+      <td>Meter</td>
+    </tr>
+     <tr>
+      <td>kvFlushLatencyMs</td>
+      <td>The kv pre-write buffer flush to underlying RocksDB latency in 
ms.</td>
+      <td>Histogram</td>
+    </tr>
+     <tr>
+      <td>preWriteBufferTruncateAsDuplicatedPerSecond</td>
+      <td>The number of kv pre-write buffer truncate due to the batch 
duplicated per second.</td>
+      <td>Meter</td>
+    </tr>
+     <tr>
+      <td>preWriteBufferTruncateAsErrorPerSecond</td>
+      <td>The number of kv pre-write buffer truncate due to the error happened 
when writing cdc to log per second.</td>
+      <td>Meter</td>
+    </tr>
   </tbody>
 </table>
 
@@ -531,10 +606,10 @@ Some metrics might not be exposed when using other JVM 
implementations (e.g. IBM
   </thead>
   <tbody>
     <tr>
-      <th rowspan="39"><strong>tabletserver</strong></th>
+      <th rowspan="30"><strong>tabletserver</strong></th>
       <td rowspan="20">table</td>
       <td>messagesInPerSecond</td>
-      <td>The number of messages written per second to this table</td>
+      <td>The number of messages written per second to this table.</td>
       <td>Meter</td>
     </tr>
      <tr>
@@ -631,45 +706,9 @@ Some metrics might not be exposed when using other JVM 
implementations (e.g. IBM
       <td>remoteLogDeleteErrorPerSecond</td>
       <td>The number of failed delete remote log requests to delete remote log 
after log ttl per second.</td>
       <td>Meter</td>
-    </tr>
-    <tr>
-      <td rowspan="7">table_bucket</td>
-      <td>inSyncReplicasCount</td>
-      <td>The inSync replicas count of this table bucket.</td>
-      <td>Gauge</td>
-    </tr>
-     <tr>
-      <td>underMinIsr</td>
-      <td>If this bucket is under min isr, this value is 1, otherwise 0.</td>
-      <td>Gauge</td>
-    </tr>
-     <tr>
-      <td>underReplicated</td>
-      <td>If this bucket is under replication factor, this value is 1, 
otherwise 0.</td>
-      <td>Gauge</td>
-    </tr>
-     <tr>
-      <td>atMinIsr</td>
-      <td>If this bucket is at min isr, this value is 1, otherwise 0.</td>
-      <td>Gauge</td>
-    </tr>
-     <tr>
-      <td>isrExpandsPerSecond</td>
-      <td>The number of isr expands per second.</td>
-      <td>Meter</td>
-    </tr>
-     <tr>
-      <td>isrShrinksPerSecond</td>
-      <td>The number of isr shrinks per second.</td>
-      <td>Meter</td>
     </tr>
      <tr>
-      <td>failedIsrUpdatesPerSecond</td>
-      <td>The failed isr updates per second.</td>
-      <td>Meter</td>
-    </tr>
-     <tr>
-      <td rowspan="5">table_bucket_log</td>
+      <td rowspan="3">table_bucket_log</td>
       <td>numSegments</td>
       <td>The number of segments in local storage for this table bucket.</td>
       <td>Gauge</td>
@@ -683,16 +722,6 @@ Some metrics might not be exposed when using other JVM 
implementations (e.g. IBM
       <td>size</td>
       <td>The total log sizes in local storage for this table bucket.</td>
       <td>Gauge</td>
-    </tr>
-     <tr>
-      <td>flushPerSecond</td>
-      <td>The log flush count per second.</td>
-      <td>Meter</td>
-    </tr>
-     <tr>
-      <td>flushLatencyMs</td>
-      <td>The log flush latency in ms.</td>
-      <td>Histogram</td>
     </tr>
     <tr>
       <td rowspan="3">table_bucket_remoteLog</td>
@@ -710,27 +739,6 @@ Some metrics might not be exposed when using other JVM 
implementations (e.g. IBM
       <td>The number of bytes written per second to this table.</td>
       <td>Gauge</td>
     </tr>
-    <tr>
-      <td rowspan="4">table_bucket_kv</td>
-      <td>preWriteBufferFlushPerSecond</td>
-      <td>The kv pre-write buffer flush count per second.</td>
-      <td>Meter</td>
-    </tr>
-     <tr>
-      <td>preWriteBufferFlushLatencyMs</td>
-      <td>The kv pre-write buffer latency in ms.</td>
-      <td>Histogram</td>
-    </tr>
-     <tr>
-      <td>preWriteBufferTruncateAsDuplicatedPerSecond</td>
-      <td>The number of kv pre-write buffer truncate due to the batch 
duplicated per second.</td>
-      <td>Meter</td>
-    </tr>
-     <tr>
-      <td>preWriteBufferTruncateAsErrorPerSecond</td>
-      <td>The number of kv pre-write buffer truncate due to the error happened 
when writing cdc to log per second.</td>
-      <td>Meter</td>
-    </tr>
     <tr>
       <td rowspan="1">table_bucket_kv_snapshot</td>
       <td>latestSnapshotSize</td>

Reply via email to