This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 5095236e6 [metrics] Adding Server-Level Storage Aggregation Metrics
(#1548)
5095236e6 is described below
commit 5095236e66afc0c0bc49e9d1f393dd82518fd30d
Author: yunhong <[email protected]>
AuthorDate: Wed Sep 17 00:36:15 2025 +0800
[metrics] Adding Server-Level Storage Aggregation Metrics (#1548)
---
.../java/org/apache/fluss/metrics/MetricNames.java | 17 +-
.../coordinator/CompletedSnapshotStoreManager.java | 67 ++++++--
.../coordinator/CoordinatorEventProcessor.java | 53 ++++++-
.../server/kv/snapshot/CompletedSnapshotStore.java | 8 +
.../kv/snapshot/PeriodicSnapshotManager.java | 9 +-
.../server/kv/snapshot/SharedKvFileRegistry.java | 10 ++
.../org/apache/fluss/server/log/LogTablet.java | 21 ++-
.../fluss/server/log/remote/LogTieringTask.java | 2 +
.../fluss/server/log/remote/RemoteLogManager.java | 5 +
.../fluss/server/log/remote/RemoteLogManifest.java | 8 +
.../fluss/server/log/remote/RemoteLogTablet.java | 6 +-
.../metrics/group/CoordinatorMetricGroup.java | 172 +++++++++++++++++++++
.../org/apache/fluss/server/replica/Replica.java | 38 ++++-
.../fluss/server/replica/ReplicaManager.java | 39 +++++
.../CompletedSnapshotStoreManagerTest.java | 27 +++-
.../coordinator/CoordinatorEventProcessorTest.java | 3 +-
.../maintenance/observability/monitor-metrics.md | 60 +++++--
17 files changed, 491 insertions(+), 54 deletions(-)
diff --git
a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
index 7bd21b552..9242b4daa 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
@@ -47,6 +47,11 @@ public class MetricNames {
public static final String EVENT_QUEUE_TIME_MS = "eventQueueTimeMs";
public static final String EVENT_PROCESSING_TIME_MS =
"eventProcessingTimeMs";
+ // for kv tablet which reported by coordinator
+ public static final String KV_NUM_SNAPSHOTS = "numKvSnapshots";
+ public static final String KV_ALL_SNAPSHOT_SIZE = "allKvSnapshotSize";
+ public static final String SERVER_PHYSICAL_STORAGE_REMOTE_KV_SIZE =
"remoteKvSize";
+
//
--------------------------------------------------------------------------------------------
// metrics for tablet server
//
--------------------------------------------------------------------------------------------
@@ -63,6 +68,11 @@ public class MetricNames {
public static final String DELAYED_FETCH_FROM_CLIENT_EXPIRES_RATE =
"delayedFetchFromClientExpiresPerSecond";
+ public static final String SERVER_LOGICAL_STORAGE_LOG_SIZE = "logSize";
+ public static final String SERVER_LOGICAL_STORAGE_KV_SIZE = "kvSize";
+ public static final String SERVER_PHYSICAL_STORAGE_LOCAL_SIZE =
"localSize";
+ public static final String SERVER_PHYSICAL_STORAGE_REMOTE_LOG_SIZE =
"remoteLogSize";
+
//
--------------------------------------------------------------------------------------------
// metrics for table
//
--------------------------------------------------------------------------------------------
@@ -121,10 +131,11 @@ public class MetricNames {
// for log tablet
public static final String LOG_NUM_SEGMENTS = "numSegments";
public static final String LOG_END_OFFSET = "endOffset";
- public static final String LOG_SIZE = "size";
+ public static final String REMOTE_LOG_SIZE = "size";
- // for kv tablet
- public static final String KV_LATEST_SNAPSHOT_SIZE = "latestSnapshotSize";
+ // for logic storage
+ public static final String LOCAL_STORAGE_LOG_SIZE = "logSize";
+ public static final String LOCAL_STORAGE_KV_SIZE = "kvSize";
//
--------------------------------------------------------------------------------------------
// metrics for rpc client
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManager.java
index 0ea715930..b7c92289b 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManager.java
@@ -19,12 +19,16 @@ package org.apache.fluss.server.coordinator;
import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.metrics.groups.MetricGroup;
import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
import org.apache.fluss.server.kv.snapshot.CompletedSnapshotHandle;
import org.apache.fluss.server.kv.snapshot.CompletedSnapshotHandleStore;
import org.apache.fluss.server.kv.snapshot.CompletedSnapshotStore;
import org.apache.fluss.server.kv.snapshot.SharedKvFileRegistry;
import
org.apache.fluss.server.kv.snapshot.ZooKeeperCompletedSnapshotHandleStore;
+import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
import org.apache.fluss.server.zk.ZooKeeperClient;
import org.apache.fluss.utils.MapUtils;
@@ -60,18 +64,19 @@ public class CompletedSnapshotStoreManager {
private final Executor ioExecutor;
private final Function<ZooKeeperClient, CompletedSnapshotHandleStore>
makeZookeeperCompletedSnapshotHandleStore;
+ private final CoordinatorMetricGroup coordinatorMetricGroup;
public CompletedSnapshotStoreManager(
int maxNumberOfSnapshotsToRetain,
Executor ioExecutor,
- ZooKeeperClient zooKeeperClient) {
- checkArgument(
- maxNumberOfSnapshotsToRetain > 0,
"maxNumberOfSnapshotsToRetain must be positive");
- this.maxNumberOfSnapshotsToRetain = maxNumberOfSnapshotsToRetain;
- this.zooKeeperClient = zooKeeperClient;
- this.bucketCompletedSnapshotStores = MapUtils.newConcurrentHashMap();
- this.ioExecutor = ioExecutor;
- this.makeZookeeperCompletedSnapshotHandleStore =
ZooKeeperCompletedSnapshotHandleStore::new;
+ ZooKeeperClient zooKeeperClient,
+ CoordinatorMetricGroup coordinatorMetricGroup) {
+ this(
+ maxNumberOfSnapshotsToRetain,
+ ioExecutor,
+ zooKeeperClient,
+ ZooKeeperCompletedSnapshotHandleStore::new,
+ coordinatorMetricGroup);
}
@VisibleForTesting
@@ -80,7 +85,8 @@ public class CompletedSnapshotStoreManager {
Executor ioExecutor,
ZooKeeperClient zooKeeperClient,
Function<ZooKeeperClient, CompletedSnapshotHandleStore>
- makeZookeeperCompletedSnapshotHandleStore) {
+ makeZookeeperCompletedSnapshotHandleStore,
+ CoordinatorMetricGroup coordinatorMetricGroup) {
checkArgument(
maxNumberOfSnapshotsToRetain > 0,
"maxNumberOfSnapshotsToRetain must be positive");
this.maxNumberOfSnapshotsToRetain = maxNumberOfSnapshotsToRetain;
@@ -88,9 +94,34 @@ public class CompletedSnapshotStoreManager {
this.bucketCompletedSnapshotStores = MapUtils.newConcurrentHashMap();
this.ioExecutor = ioExecutor;
this.makeZookeeperCompletedSnapshotHandleStore =
makeZookeeperCompletedSnapshotHandleStore;
+ this.coordinatorMetricGroup = coordinatorMetricGroup;
+
+ registerMetrics();
}
- public CompletedSnapshotStore
getOrCreateCompletedSnapshotStore(TableBucket tableBucket) {
+ private void registerMetrics() {
+ MetricGroup physicalStorage =
coordinatorMetricGroup.addGroup("physicalStorage");
+ physicalStorage.gauge(
+ MetricNames.SERVER_PHYSICAL_STORAGE_REMOTE_KV_SIZE,
+ this::physicalStorageRemoteKvSize);
+ }
+
+ private long physicalStorageRemoteKvSize() {
+ return bucketCompletedSnapshotStores.values().stream()
+ .map(CompletedSnapshotStore::getPhysicalStorageRemoteKvSize)
+ .reduce(0L, Long::sum);
+ }
+
+ private long getNumSnapshots(TableBucket tableBucket) {
+ return
bucketCompletedSnapshotStores.get(tableBucket).getNumSnapshots();
+ }
+
+ private long getAllSnapshotSize(TableBucket tableBucket) {
+ return
bucketCompletedSnapshotStores.get(tableBucket).getPhysicalStorageRemoteKvSize();
+ }
+
+ public CompletedSnapshotStore getOrCreateCompletedSnapshotStore(
+ TablePath tablePath, TableBucket tableBucket) {
return bucketCompletedSnapshotStores.computeIfAbsent(
tableBucket,
(bucket) -> {
@@ -104,6 +135,22 @@ public class CompletedSnapshotStoreManager {
"Created snapshot store for table bucket {} in
{} ms.",
bucket,
end - start);
+
+ MetricGroup bucketMetricGroup =
+
coordinatorMetricGroup.getTableBucketMetricGroup(
+ tablePath, tableBucket);
+ if (bucketMetricGroup != null) {
+ LOG.info("Add bucketMetricGroup for tableBucket
{}.", bucket);
+ bucketMetricGroup.gauge(
+ MetricNames.KV_NUM_SNAPSHOTS, () ->
getNumSnapshots(bucket));
+ bucketMetricGroup.gauge(
+ MetricNames.KV_ALL_SNAPSHOT_SIZE,
+ () -> getAllSnapshotSize(bucket));
+ } else {
+ LOG.warn(
+ "Failed to add bucketMetricGroup for
tableBucket {} when creating completed snapshot.",
+ bucket);
+ }
return snapshotStore;
} catch (Exception e) {
throw new RuntimeException(
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
index ba48a6240..66d52d9fc 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
@@ -188,7 +188,8 @@ public class CoordinatorEventProcessor implements
EventProcessor {
new CompletedSnapshotStoreManager(
conf.getInt(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS),
ioExecutor,
- zooKeeperClient);
+ zooKeeperClient,
+ coordinatorMetricGroup);
this.autoPartitionManager = autoPartitionManager;
this.lakeTableTieringManager = lakeTableTieringManager;
this.coordinatorMetricGroup = coordinatorMetricGroup;
@@ -456,6 +457,20 @@ public class CoordinatorEventProcessor implements
EventProcessor {
coordinatorContext.putBucketLeaderAndIsr(tableBucket,
leaderAndIsr);
}
}
+
+ // register table/bucket metrics when initialing context.
+ TablePath tablePath = coordinatorContext.getTablePathById(tableId);
+ if (tablePath != null) {
+ coordinatorMetricGroup.addTableBucketMetricGroup(
+ PhysicalTablePath.of(
+ tablePath,
+ partitionId == null
+ ? null
+ :
coordinatorContext.getPartitionName(partitionId)),
+ tableId,
+ partitionId,
+ tableAssignment.getBucketAssignments().keySet());
+ }
}
private void onShutdown() {
@@ -529,10 +544,10 @@ public class CoordinatorEventProcessor implements
EventProcessor {
return;
}
TableInfo tableInfo = createTableEvent.getTableInfo();
+ TablePath tablePath = tableInfo.getTablePath();
coordinatorContext.putTableInfo(tableInfo);
TableAssignment tableAssignment =
createTableEvent.getTableAssignment();
- tableManager.onCreateNewTable(
- tableInfo.getTablePath(), tableInfo.getTableId(),
tableAssignment);
+ tableManager.onCreateNewTable(tablePath, tableInfo.getTableId(),
tableAssignment);
if (createTableEvent.isAutoPartitionTable()) {
autoPartitionManager.addAutoPartitionTable(tableInfo, true);
}
@@ -551,6 +566,14 @@ public class CoordinatorEventProcessor implements
EventProcessor {
null,
null,
tableBuckets);
+
+ // register table metrics.
+ coordinatorMetricGroup.addTableBucketMetricGroup(
+ PhysicalTablePath.of(tablePath),
+ tableId,
+ null,
+ tableAssignment.getBucketAssignments().keySet());
+
} else {
updateTabletServerMetadataCache(
new
HashSet<>(coordinatorContext.getLiveTabletServers().values()),
@@ -568,10 +591,11 @@ public class CoordinatorEventProcessor implements
EventProcessor {
}
long tableId = createPartitionEvent.getTableId();
+ TablePath tablePath = createPartitionEvent.getTablePath();
String partitionName = createPartitionEvent.getPartitionName();
PartitionAssignment partitionAssignment =
createPartitionEvent.getPartitionAssignment();
tableManager.onCreateNewPartition(
- createPartitionEvent.getTablePath(),
+ tablePath,
tableId,
createPartitionEvent.getPartitionId(),
partitionName,
@@ -585,6 +609,14 @@ public class CoordinatorEventProcessor implements
EventProcessor {
.forEach(
bucketId ->
tableBuckets.add(new TableBucket(tableId,
partitionId, bucketId)));
+
+ // register partition metrics.
+ coordinatorMetricGroup.addTableBucketMetricGroup(
+ PhysicalTablePath.of(tablePath, partitionName),
+ tableId,
+ partitionId,
+ partitionAssignment.getBucketAssignments().keySet());
+
updateTabletServerMetadataCache(
new
HashSet<>(coordinatorContext.getLiveTabletServers().values()),
null,
@@ -617,6 +649,9 @@ public class CoordinatorEventProcessor implements
EventProcessor {
tableId,
null,
Collections.emptySet());
+
+ // remove table metrics.
+
coordinatorMetricGroup.removeTableMetricGroup(dropTableInfo.getTablePath(),
tableId);
}
private void processDropPartition(DropPartitionEvent dropPartitionEvent) {
@@ -644,6 +679,10 @@ public class CoordinatorEventProcessor implements
EventProcessor {
tableId,
tablePartition.getPartitionId(),
Collections.emptySet());
+
+ // remove partition metrics.
+ coordinatorMetricGroup.removeTablePartitionMetricsGroup(
+ dropTableInfo.getTablePath(), tableId,
tablePartition.getPartitionId());
}
private void processDeleteReplicaResponseReceived(
@@ -990,15 +1029,17 @@ public class CoordinatorEventProcessor implements
EventProcessor {
return;
}
// commit the kv snapshot asynchronously
+ TableBucket tb = event.getTableBucket();
+ TablePath tablePath =
coordinatorContext.getTablePathById(tb.getTableId());
ioExecutor.execute(
() -> {
try {
- TableBucket tb = event.getTableBucket();
CompletedSnapshot completedSnapshot =
event.getAddCompletedSnapshotData().getCompletedSnapshot();
// add completed snapshot
CompletedSnapshotStore completedSnapshotStore =
-
completedSnapshotStoreManager.getOrCreateCompletedSnapshotStore(tb);
+
completedSnapshotStoreManager.getOrCreateCompletedSnapshotStore(
+ tablePath, tb);
// this involves IO operation (ZK), so we do it in
ioExecutor
completedSnapshotStore.add(completedSnapshot);
coordinatorEventManager.put(
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStore.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStore.java
index 309e3fcac..3e93cc46a 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStore.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/CompletedSnapshotStore.java
@@ -87,6 +87,14 @@ public class CompletedSnapshotStore {
addSnapshotAndSubsumeOldestOne(completedSnapshot, snapshotsCleaner, ()
-> {});
}
+ public long getPhysicalStorageRemoteKvSize() {
+ return sharedKvFileRegistry.getFileSize();
+ }
+
+ public long getNumSnapshots() {
+ return completedSnapshots.size();
+ }
+
/**
* Synchronously writes the new snapshots to snapshot handle store and
asynchronously removes
* older ones.
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManager.java
index f0ac036bf..0c4314469 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/PeriodicSnapshotManager.java
@@ -21,8 +21,6 @@ import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.fs.FileSystemSafetyNet;
import org.apache.fluss.fs.FsPath;
import org.apache.fluss.metadata.TableBucket;
-import org.apache.fluss.metrics.MetricNames;
-import org.apache.fluss.metrics.groups.MetricGroup;
import org.apache.fluss.server.metrics.group.BucketMetricGroup;
import org.apache.fluss.utils.MathUtils;
import org.apache.fluss.utils.concurrent.Executors;
@@ -124,8 +122,6 @@ public class PeriodicSnapshotManager implements Closeable {
periodicSnapshotDelay > 0
? MathUtils.murmurHash(tableBucket.hashCode()) %
periodicSnapshotDelay
: 0;
-
- registerMetrics(bucketMetricGroup);
}
public static PeriodicSnapshotManager create(
@@ -156,9 +152,8 @@ public class PeriodicSnapshotManager implements Closeable {
}
}
- private void registerMetrics(BucketMetricGroup bucketMetricGroup) {
- MetricGroup metricGroup =
bucketMetricGroup.addGroup("kv").addGroup("snapshot");
- metricGroup.gauge(MetricNames.KV_LATEST_SNAPSHOT_SIZE,
target::getSnapshotSize);
+ public long getSnapshotSize() {
+ return target.getSnapshotSize();
}
// schedule thread and asyncOperationsThreadPool can access this method
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SharedKvFileRegistry.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SharedKvFileRegistry.java
index e2699cb32..9d13d3e0e 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SharedKvFileRegistry.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SharedKvFileRegistry.java
@@ -53,6 +53,9 @@ public class SharedKvFileRegistry implements AutoCloseable {
/** This flag indicates whether or not the registry is open or if close()
was called. */
private boolean open;
+ /** The total size of all kv files registered in this registry. */
+ private volatile long fileSize;
+
/** Executor for async kv deletion. */
private final Executor asyncDisposalExecutor;
@@ -64,6 +67,11 @@ public class SharedKvFileRegistry implements AutoCloseable {
this.registeredKvEntries = new HashMap<>();
this.asyncDisposalExecutor = checkNotNull(asyncDisposalExecutor);
this.open = true;
+ this.fileSize = 0L;
+ }
+
+ public long getFileSize() {
+ return fileSize;
}
public KvFileHandle registerReference(
@@ -87,6 +95,7 @@ public class SharedKvFileRegistry implements AutoCloseable {
LOG.trace("Registered new kv file {} under key {}.",
newHandle, registrationKey);
entry = new SharedKvEntry(newHandle, snapshotID);
registeredKvEntries.put(registrationKey, entry);
+ fileSize += newHandle.getSize();
// no further handling
return entry.kvFileHandle;
@@ -134,6 +143,7 @@ public class SharedKvFileRegistry implements AutoCloseable {
if (entry.lastUsedSnapshotID < lowestSnapshotID) {
subsumed.add(entry.kvFileHandle);
it.remove();
+ fileSize -= entry.kvFileHandle.getSize();
}
}
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
index c23606850..1470081a8 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
@@ -117,6 +117,8 @@ public final class LogTablet {
private volatile long remoteLogStartOffset = Long.MAX_VALUE;
// tracking the log end offset in remote storage
private volatile long remoteLogEndOffset = -1L;
+ // tracking the log size in remote storage
+ private volatile long remoteLogSize = 0;
// tracking the log start/end offset in lakehouse storage
private volatile long lakeTableSnapshotId = -1;
@@ -339,7 +341,20 @@ public final class LogTablet {
metricGroup.gauge(
MetricNames.LOG_NUM_SEGMENTS, () ->
localLog.getSegments().numberOfSegments());
metricGroup.gauge(MetricNames.LOG_END_OFFSET,
localLog::getLocalLogEndOffset);
- metricGroup.gauge(MetricNames.LOG_SIZE, () ->
localLog.getSegments().sizeInBytes());
+ }
+
+ public long logSize() {
+ return localLog.getSegments().sizeInBytes();
+ }
+
+ public long logicalStorageSize() {
+ if (remoteLogEndOffset <= 0L) {
+ return localLog.getSegments().sizeInBytes();
+ } else {
+ return
localLog.getSegments().higherSegments(remoteLogEndOffset).stream()
+ .mapToLong(LogSegment::getSizeInBytes)
+ .reduce(remoteLogSize, Long::sum);
+ }
}
public void updateLeaderEndOffsetSnapshot() {
@@ -470,6 +485,10 @@ public final class LogTablet {
}
}
+ public void updateRemoteLogSize(long remoteLogSize) {
+ this.remoteLogSize = remoteLogSize;
+ }
+
public void updateRemoteLogEndOffset(long remoteLogEndOffset) {
if (remoteLogEndOffset > this.remoteLogEndOffset) {
this.remoteLogEndOffset = remoteLogEndOffset;
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java
index 0e3666509..6df83faa4 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/LogTieringTask.java
@@ -322,6 +322,7 @@ public class LogTieringTask implements Runnable {
// to try to commit this snapshot.
long newRemoteLogStartOffset =
newRemoteLogManifest.getRemoteLogStartOffset();
long newRemoteLogEndOffset =
newRemoteLogManifest.getRemoteLogEndOffset();
+ long newRemoteLogSize = newRemoteLogManifest.getRemoteLogSize();
int retrySendCommitTimes = 1;
while (retrySendCommitTimes <= 10) {
try {
@@ -356,6 +357,7 @@ public class LogTieringTask implements Runnable {
logTablet.updateRemoteLogStartOffset(newRemoteLogStartOffset);
// make the local log cleaner clean log segments that are
committed to remote.
logTablet.updateRemoteLogEndOffset(newRemoteLogEndOffset);
+ logTablet.updateRemoteLogSize(newRemoteLogSize);
return true;
}
} catch (Exception e) {
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java
index cc23be0be..d963f4fcf 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManager.java
@@ -150,6 +150,7 @@ public class RemoteLogManager implements Closeable {
}
remoteLog.getRemoteLogEndOffset().ifPresent(log::updateRemoteLogEndOffset);
log.updateRemoteLogStartOffset(remoteLog.getRemoteLogStartOffset());
+ log.updateRemoteLogSize(remoteLog.getRemoteSizeInBytes());
// leader needs to register the remote log metrics
remoteLog.registerMetrics(replica.bucketMetrics());
remoteLogs.put(tableBucket, remoteLog);
@@ -158,6 +159,10 @@ public class RemoteLogManager implements Closeable {
LOG.debug("Added the remote log tiering task for replica {}",
tableBucket);
}
+ public long getRemoteLogSize() {
+ return
remoteLogs.values().stream().mapToLong(RemoteLogTablet::getRemoteSizeInBytes).sum();
+ }
+
/** Stop the log tiering task for the given replica. */
public void stopLogTiering(Replica replica) {
if (remoteDisabled()) {
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManifest.java
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManifest.java
index c6bc380e7..e1478dec4 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManifest.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogManifest.java
@@ -96,6 +96,14 @@ public class RemoteLogManifest {
return endOffset;
}
+ public long getRemoteLogSize() {
+ long size = 0;
+ for (RemoteLogSegment remoteLogSegment : remoteLogSegmentList) {
+ size += remoteLogSegment.segmentSizeInBytes();
+ }
+ return size;
+ }
+
public byte[] toJsonBytes() {
return RemoteLogManifestJsonSerde.toJson(this);
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogTablet.java
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogTablet.java
index 017ef9383..75892ed55 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogTablet.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/log/remote/RemoteLogTablet.java
@@ -122,11 +122,15 @@ public class RemoteLogTablet {
MetricGroup metricGroup =
bucketMetricGroup.addGroup("remoteLog");
metricGroup.gauge(MetricNames.LOG_NUM_SEGMENTS, () ->
numRemoteLogSegments);
metricGroup.gauge(MetricNames.LOG_END_OFFSET, () ->
remoteLogEndOffset);
- metricGroup.gauge(MetricNames.LOG_SIZE, () ->
remoteSizeInBytes);
+ metricGroup.gauge(MetricNames.REMOTE_LOG_SIZE,
this::getRemoteSizeInBytes);
remoteLogMetrics = metricGroup;
});
}
+ public long getRemoteSizeInBytes() {
+ return remoteSizeInBytes;
+ }
+
public void unregisterMetrics() {
inWriteLock(
lock,
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/CoordinatorMetricGroup.java
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/CoordinatorMetricGroup.java
index 251fb462b..4045fbae7 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/CoordinatorMetricGroup.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/CoordinatorMetricGroup.java
@@ -17,19 +17,34 @@
package org.apache.fluss.server.metrics.group;
+import org.apache.fluss.metadata.PhysicalTablePath;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.metrics.CharacterFilter;
import org.apache.fluss.metrics.groups.AbstractMetricGroup;
+import org.apache.fluss.metrics.groups.MetricGroup;
import org.apache.fluss.metrics.registry.MetricRegistry;
import org.apache.fluss.server.coordinator.event.CoordinatorEvent;
import org.apache.fluss.utils.MapUtils;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
+
+import static org.apache.fluss.metrics.utils.MetricGroupUtils.makeScope;
/** The metric group for coordinator server. */
public class CoordinatorMetricGroup extends AbstractMetricGroup {
private static final String NAME = "coordinator";
+ private final Map<TablePath, SimpleTableMetricGroup> metricGroupByTable =
+ MapUtils.newConcurrentHashMap();
+
protected final String clusterId;
protected final String hostname;
protected final String serverId;
@@ -62,4 +77,161 @@ public class CoordinatorMetricGroup extends
AbstractMetricGroup {
return eventMetricGroups.computeIfAbsent(
eventClass, e -> new CoordinatorEventMetricGroup(registry,
eventClass, this));
}
+
+ // ------------------------------------------------------------------------
+ // table buckets groups
+ // ------------------------------------------------------------------------
+
+ public @Nullable MetricGroup getTableBucketMetricGroup(
+ TablePath tablePath, TableBucket tableBucket) {
+ SimpleTableMetricGroup tableMetricGroup =
metricGroupByTable.get(tablePath);
+ if (tableMetricGroup == null) {
+ return null;
+ }
+ return tableMetricGroup.buckets.get(tableBucket);
+ }
+
+ public void addTableBucketMetricGroup(
+ PhysicalTablePath physicalTablePath,
+ long tableId,
+ @Nullable Long partitionId,
+ Set<Integer> buckets) {
+ TablePath tablePath = physicalTablePath.getTablePath();
+ SimpleTableMetricGroup tableMetricGroup =
+ metricGroupByTable.computeIfAbsent(
+ tablePath, table -> new
SimpleTableMetricGroup(registry, tablePath, this));
+ buckets.forEach(
+ bucket ->
+ tableMetricGroup.addBucketMetricGroup(
+ physicalTablePath.getPartitionName(),
+ new TableBucket(tableId, partitionId,
bucket)));
+ }
+
+ public void removeTableMetricGroup(TablePath tablePath, long tableId) {
+ SimpleTableMetricGroup tableMetricGroup =
metricGroupByTable.remove(tablePath);
+ if (tableMetricGroup != null) {
+ tableMetricGroup.removeBucketMetricsGroupForTable(tableId);
+ tableMetricGroup.close();
+ }
+ }
+
+ public void removeTablePartitionMetricsGroup(
+ TablePath tablePath, long tableId, long partitionId) {
+ SimpleTableMetricGroup tableMetricGroup =
metricGroupByTable.get(tablePath);
+ if (tableMetricGroup != null) {
+ tableMetricGroup.removeBucketMetricsGroupForPartition(tableId,
partitionId);
+ }
+ }
+
+ /** The metric group for table. */
+ private static class SimpleTableMetricGroup extends AbstractMetricGroup {
+
+ private final Map<TableBucket, SimpleBucketMetricGroup> buckets = new
HashMap<>();
+
+ private final TablePath tablePath;
+
+ private final MetricRegistry registry;
+
+ public SimpleTableMetricGroup(
+ MetricRegistry registry,
+ TablePath tablePath,
+ AbstractMetricGroup serverMetricGroup) {
+ super(
+ registry,
+ makeScope(
+ serverMetricGroup,
+ tablePath.getDatabaseName(),
+ tablePath.getTableName()),
+ serverMetricGroup);
+
+ this.tablePath = tablePath;
+ this.registry = registry;
+ }
+
+ @Override
+ protected void putVariables(Map<String, String> variables) {
+ variables.put("database", tablePath.getDatabaseName());
+ variables.put("table", tablePath.getTableName());
+ }
+
+ @Override
+ protected String getGroupName(CharacterFilter filter) {
+ // partition and table share same logic group name
+ return "table";
+ }
+
+ //
------------------------------------------------------------------------
+ // bucket groups
+ //
------------------------------------------------------------------------
+ public void addBucketMetricGroup(@Nullable String partitionName,
TableBucket tableBucket) {
+ buckets.computeIfAbsent(
+ tableBucket,
+ (bucket) ->
+ new SimpleBucketMetricGroup(
+ registry, partitionName,
tableBucket.getBucket(), this));
+ }
+
+ public void removeBucketMetricsGroupForTable(long tableId) {
+ List<TableBucket> tableBuckets = new ArrayList<>();
+ buckets.forEach(
+ (tableBucket, bucketMetricGroup) -> {
+ if (tableBucket.getTableId() == tableId) {
+ tableBuckets.add(tableBucket);
+ }
+ });
+ tableBuckets.forEach(this::removeBucketMetricGroup);
+ }
+
+ public void removeBucketMetricsGroupForPartition(long tableId, long
partitionId) {
+ List<TableBucket> tableBuckets = new ArrayList<>();
+ buckets.forEach(
+ (tableBucket, bucketMetricGroup) -> {
+ Long bucketPartitionId = tableBucket.getPartitionId();
+ if (tableBucket.getTableId() == tableId
+ && bucketPartitionId != null
+ && bucketPartitionId == partitionId) {
+ tableBuckets.add(tableBucket);
+ }
+ });
+ tableBuckets.forEach(this::removeBucketMetricGroup);
+ }
+
+ public void removeBucketMetricGroup(TableBucket tb) {
+ SimpleBucketMetricGroup metricGroup = buckets.remove(tb);
+ metricGroup.close();
+ }
+ }
+
+ /** The metric group for bucket. */
+ private static class SimpleBucketMetricGroup extends AbstractMetricGroup {
+ // will be null if the bucket doesn't belong to a partition
+ private final @Nullable String partitionName;
+ private final int bucket;
+
+ public SimpleBucketMetricGroup(
+ MetricRegistry registry,
+ @Nullable String partitionName,
+ int bucket,
+ SimpleTableMetricGroup parent) {
+ super(registry, makeScope(parent, String.valueOf(bucket)), parent);
+ this.partitionName = partitionName;
+ this.bucket = bucket;
+ }
+
+ @Override
+ protected void putVariables(Map<String, String> variables) {
+ if (partitionName != null) {
+ variables.put("partition", partitionName);
+ } else {
+ // value of empty string indicates non-partitioned tables
+ variables.put("partition", "");
+ }
+ variables.put("bucket", String.valueOf(bucket));
+ }
+
+ @Override
+ protected String getGroupName(CharacterFilter filter) {
+ return "bucket";
+ }
+ }
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
index 44e44bfe6..90c1630f8 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
@@ -38,6 +38,8 @@ import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.metrics.Counter;
+import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.metrics.groups.MetricGroup;
import org.apache.fluss.record.DefaultValueRecordBatch;
import org.apache.fluss.record.KvRecordBatch;
import org.apache.fluss.record.LogRecords;
@@ -192,6 +194,7 @@ public final class Replica {
// null if table without pk or haven't become leader
private volatile @Nullable KvTablet kvTablet;
private volatile @Nullable CloseableRegistry closeableRegistryForKv;
+ private @Nullable PeriodicSnapshotManager kvSnapshotManager;
// ------- metrics
private Counter isrShrinks;
@@ -250,6 +253,31 @@ public final class Replica {
isrExpands = serverMetrics.isrExpands();
isrShrinks = serverMetrics.isrShrinks();
failedIsrUpdates = serverMetrics.failedIsrUpdates();
+
+ // logical storage metrics.
+ MetricGroup logicalStorageMetrics =
bucketMetricGroup.addGroup("logicalStorage");
+ logicalStorageMetrics.gauge(
+ MetricNames.LOCAL_STORAGE_LOG_SIZE,
this::logicalStorageLogSize);
+ logicalStorageMetrics.gauge(MetricNames.LOCAL_STORAGE_KV_SIZE,
this::logicalStorageKvSize);
+ }
+
+ public long logicalStorageLogSize() {
+ if (isLeader()) {
+ return logTablet.logicalStorageSize();
+ } else {
+ // follower doesn't need to report the logical storage size.
+ return 0L;
+ }
+ }
+
+ public long logicalStorageKvSize() {
+ if (isLeader() && isKvTable()) {
+ checkNotNull(kvSnapshotManager, "kvSnapshotManager is null");
+ return kvSnapshotManager.getSnapshotSize();
+ } else {
+ // follower doesn't need to report the logical storage size.
+ return 0L;
+ }
}
public boolean isKvTable() {
@@ -792,7 +820,7 @@ public final class Replica {
coordinatorEpochSupplier,
lastCompletedSnapshotLogOffset,
snapshotSize);
- PeriodicSnapshotManager kvSnapshotManager =
+ this.kvSnapshotManager =
PeriodicSnapshotManager.create(
tableBucket,
kvTabletSnapshotTarget,
@@ -806,6 +834,14 @@ public final class Replica {
}
}
+ public long getLatestKvSnapshotSize() {
+ if (kvSnapshotManager == null) {
+ return 0L;
+ } else {
+ return kvSnapshotManager.getSnapshotSize();
+ }
+ }
+
public long getLeaderEndOffsetSnapshot() {
return logTablet.getLeaderEndOffsetSnapshot();
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
index 692bd4c2a..876e8e984 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
@@ -34,6 +34,7 @@ import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.metrics.groups.MetricGroup;
import org.apache.fluss.record.KvRecordBatch;
import org.apache.fluss.record.MemoryLogRecords;
import org.apache.fluss.remote.RemoteLogFetchInfo;
@@ -306,6 +307,19 @@ public class ReplicaManager {
serverMetricGroup.gauge(MetricNames.UNDER_REPLICATED,
this::underReplicatedCount);
serverMetricGroup.gauge(MetricNames.UNDER_MIN_ISR,
this::underMinIsrCount);
serverMetricGroup.gauge(MetricNames.AT_MIN_ISR, this::atMinIsrCount);
+
+ MetricGroup logicalStorage =
serverMetricGroup.addGroup("logicalStorage");
+ logicalStorage.gauge(
+ MetricNames.SERVER_LOGICAL_STORAGE_LOG_SIZE,
this::logicalStorageLogSize);
+ logicalStorage.gauge(
+ MetricNames.SERVER_LOGICAL_STORAGE_KV_SIZE,
this::logicalStorageKvSize);
+
+ MetricGroup physicalStorage =
serverMetricGroup.addGroup("physicalStorage");
+ physicalStorage.gauge(
+ MetricNames.SERVER_PHYSICAL_STORAGE_LOCAL_SIZE,
this::physicalStorageLocalSize);
+ physicalStorage.gauge(
+ MetricNames.SERVER_PHYSICAL_STORAGE_REMOTE_LOG_SIZE,
+ this::physicalStorageRemoteLogSize);
}
private Stream<Replica> onlineReplicas() {
@@ -338,6 +352,31 @@ public class ReplicaManager {
return onlineReplicas().map(Replica::writerIdCount).reduce(0,
Integer::sum);
}
+ private long logicalStorageLogSize() {
+ return onlineReplicas().map(Replica::logicalStorageLogSize).reduce(0L,
Long::sum);
+ }
+
+ private long logicalStorageKvSize() {
+ return onlineReplicas().map(Replica::logicalStorageKvSize).reduce(0L,
Long::sum);
+ }
+
+ private long physicalStorageLocalSize() {
+ return onlineReplicas()
+ .mapToLong(
+ replica -> {
+ long size = replica.getLogTablet().logSize();
+ if (replica.isKvTable()) {
+ size += replica.getLatestKvSnapshotSize();
+ }
+ return size;
+ })
+ .reduce(0L, Long::sum);
+ }
+
+ private long physicalStorageRemoteLogSize() {
+ return remoteLogManager.getRemoteLogSize();
+ }
+
/**
* Receive a request to make these replicas to become leader or follower,
if the replica doesn't
* exit, we will create it.
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManagerTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManagerTest.java
index 4ed6617fc..b8fec59ca 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManagerTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CompletedSnapshotStoreManagerTest.java
@@ -24,6 +24,7 @@ import
org.apache.fluss.server.kv.snapshot.CompletedSnapshotHandleStore;
import org.apache.fluss.server.kv.snapshot.CompletedSnapshotStore;
import org.apache.fluss.server.kv.snapshot.TestingCompletedSnapshotHandle;
import
org.apache.fluss.server.kv.snapshot.ZooKeeperCompletedSnapshotHandleStore;
+import org.apache.fluss.server.metrics.group.TestingMetricGroups;
import org.apache.fluss.server.testutils.KvTestUtils;
import org.apache.fluss.server.zk.NOPErrorHandler;
import org.apache.fluss.server.zk.ZooKeeperClient;
@@ -53,6 +54,7 @@ import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link CompletedSnapshotStoreManager}. */
@@ -117,7 +119,8 @@ class CompletedSnapshotStoreManagerTest {
// check has retain number of snapshots
assertThat(
completedSnapshotStoreManager
-
.getOrCreateCompletedSnapshotStore(tableBucket)
+ .getOrCreateCompletedSnapshotStore(
+ DATA1_TABLE_PATH, tableBucket)
.getAllSnapshots())
.hasSize(maxNumberOfSnapshotsToRetain);
}
@@ -144,7 +147,8 @@ class CompletedSnapshotStoreManagerTest {
// check has retain number of snapshots
assertThat(
completedSnapshotStoreManager
-
.getOrCreateCompletedSnapshotStore(tableBucket)
+ .getOrCreateCompletedSnapshotStore(
+ DATA1_TABLE_PATH, tableBucket)
.getAllSnapshots())
.hasSize(maxNumberOfSnapshotsToRetain);
}
@@ -153,7 +157,7 @@ class CompletedSnapshotStoreManagerTest {
TableBucket nonExistBucket = new TableBucket(10, 100);
assertThat(
completedSnapshotStoreManager
-
.getOrCreateCompletedSnapshotStore(nonExistBucket)
+
.getOrCreateCompletedSnapshotStore(DATA1_TABLE_PATH, nonExistBucket)
.getAllSnapshots())
.hasSize(0);
}
@@ -206,11 +210,13 @@ class CompletedSnapshotStoreManagerTest {
10,
ioExecutor,
zookeeperClient,
- zooKeeperClient -> completedSnapshotHandleStore);
+ zooKeeperClient -> completedSnapshotHandleStore,
+ TestingMetricGroups.COORDINATOR_METRICS);
// Verify that only the valid snapshot remains
CompletedSnapshotStore completedSnapshotStore =
-
completedSnapshotStoreManager.getOrCreateCompletedSnapshotStore(tableBucket);
+
completedSnapshotStoreManager.getOrCreateCompletedSnapshotStore(
+ DATA1_TABLE_PATH, tableBucket);
assertThat(completedSnapshotStore.getAllSnapshots()).hasSize(1);
assertThat(completedSnapshotStore.getAllSnapshots().get(0).getSnapshotID()).isEqualTo(1L);
}
@@ -218,13 +224,17 @@ class CompletedSnapshotStoreManagerTest {
private CompletedSnapshotStoreManager createCompletedSnapshotStoreManager(
int maxNumberOfSnapshotsToRetain) {
return new CompletedSnapshotStoreManager(
- maxNumberOfSnapshotsToRetain, ioExecutor, zookeeperClient);
+ maxNumberOfSnapshotsToRetain,
+ ioExecutor,
+ zookeeperClient,
+ TestingMetricGroups.COORDINATOR_METRICS);
}
private CompletedSnapshot getLatestCompletedSnapshot(
CompletedSnapshotStoreManager completedSnapshotStoreManager,
TableBucket tableBucket) {
CompletedSnapshotStore completedSnapshotStore =
-
completedSnapshotStoreManager.getOrCreateCompletedSnapshotStore(tableBucket);
+
completedSnapshotStoreManager.getOrCreateCompletedSnapshotStore(
+ DATA1_TABLE_PATH, tableBucket);
return completedSnapshotStore.getLatestSnapshot().get();
}
@@ -234,7 +244,8 @@ class CompletedSnapshotStoreManagerTest {
throws Exception {
TableBucket tableBucket = completedSnapshot.getTableBucket();
CompletedSnapshotStore completedSnapshotStore =
-
completedSnapshotStoreManager.getOrCreateCompletedSnapshotStore(tableBucket);
+
completedSnapshotStoreManager.getOrCreateCompletedSnapshotStore(
+ DATA1_TABLE_PATH, tableBucket);
completedSnapshotStore.add(completedSnapshot);
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
index 82cd8f1a3..09a833e41 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
@@ -221,7 +221,7 @@ class CoordinatorEventProcessorTest {
// mock CompletedSnapshotStore
for (TableBucket tableBucket : allTableBuckets(t1Id, nBuckets)) {
completedSnapshotStoreManager.getOrCreateCompletedSnapshotStore(
- new TableBucket(tableBucket.getTableId(),
tableBucket.getBucket()));
+ t1, new TableBucket(tableBucket.getTableId(),
tableBucket.getBucket()));
}
assertThat(completedSnapshotStoreManager.getBucketCompletedSnapshotStores()).isNotEmpty();
@@ -606,6 +606,7 @@ class CoordinatorEventProcessorTest {
// mock CompletedSnapshotStore for partition1
for (TableBucket tableBucket : allTableBuckets(tableId, partition1Id,
nBuckets)) {
completedSnapshotStoreManager.getOrCreateCompletedSnapshotStore(
+ tablePath,
new TableBucket(
tableBucket.getTableId(),
tableBucket.getPartitionId(),
diff --git a/website/docs/maintenance/observability/monitor-metrics.md
b/website/docs/maintenance/observability/monitor-metrics.md
index a01f9e67c..6b01cceda 100644
--- a/website/docs/maintenance/observability/monitor-metrics.md
+++ b/website/docs/maintenance/observability/monitor-metrics.md
@@ -294,7 +294,7 @@ Some metrics might not be exposed when using other JVM
implementations (e.g. IBM
</thead>
<tbody>
<tr>
- <th rowspan="9"><strong>coordinator</strong></th>
+ <th rowspan="12"><strong>coordinator</strong></th>
<td style={{textAlign: 'center', verticalAlign: 'middle' }}
rowspan="7">-</td>
<td>activeCoordinatorCount</td>
<td>The number of active CoordinatorServer in this cluster.</td>
@@ -331,7 +331,7 @@ Some metrics might not be exposed when using other JVM
implementations (e.g. IBM
<td>Histogram</td>
</tr>
<tr>
- <td style={{textAlign: 'center', verticalAlign: 'middle' }}
rowspan="2">event</td>
+ <td rowspan="2">event</td>
<td>eventQueueSize</td>
<td>The number of events currently waiting to be processed in the
coordinator event queue. This metric is labeled with <code>event_type</code> to
distinguish between different types of coordinator events.</td>
<td>Gauge</td>
@@ -341,6 +341,23 @@ Some metrics might not be exposed when using other JVM
implementations (e.g. IBM
<td>The time that an event took to be processed by the coordinator event
processor. This metric is labeled with <code>event_type</code> to distinguish
between different types of coordinator events.</td>
<td>Histogram</td>
</tr>
+ <tr>
+ <td rowspan="1">physicalStorage</td>
+ <td>remoteKvSize</td>
+ <td>The physical storage size of remote KV store.</td>
+ <td>Gauge</td>
+ </tr>
+ <tr>
+ <td rowspan="2">table_bucket</td>
+ <td>numKvSnapshots</td>
+ <td>number of kv snapshots of each table bucket.</td>
+ <td>Gauge</td>
+ </tr>
+ <tr>
+ <td>allKvSnapshotSize</td>
+ <td>all kv snapshot size of each table bucket.</td>
+ <td>Gauge</td>
+ </tr>
</tbody>
</table>
@@ -358,7 +375,7 @@ Some metrics might not be exposed when using other JVM
implementations (e.g. IBM
</thead>
<tbody>
<tr>
- <th rowspan="25"><strong>tabletserver</strong></th>
+ <th rowspan="29"><strong>tabletserver</strong></th>
<td style={{textAlign: 'center', verticalAlign: 'middle' }}
rowspan="25">-</td>
<td>messagesInPerSecond</td>
<td>The number of messages written per second to this server.</td>
@@ -484,6 +501,28 @@ Some metrics might not be exposed when using other JVM
implementations (e.g. IBM
<td>The number of kv pre-write buffer truncate due to the error happened
when writing cdc to log per second.</td>
<td>Meter</td>
</tr>
+ <tr>
+ <td rowspan="2">logicalStorage</td>
+ <td>logSize</td>
+ <td>The logical storage size of log managed by this TabletServer.</td>
+ <td>Gauge</td>
+ </tr>
+ <tr>
+ <td>kvSize</td>
+ <td>The logical storage size of kv managed by this TabletServer.</td>
+ <td>Gauge</td>
+ </tr>
+ <tr>
+ <td rowspan="2">physicalStorage</td>
+ <td>localSize</td>
+ <td>The physical local storage size of this TabletServer.</td>
+ <td>Gauge</td>
+ </tr>
+ <tr>
+ <td>remoteLogSize</td>
+ <td>The physical remote log size managed by this TabletServer.</td>
+ <td>Gauge</td>
+ </tr>
</tbody>
</table>
@@ -607,7 +646,7 @@ Some metrics might not be exposed when using other JVM
implementations (e.g. IBM
</thead>
<tbody>
<tr>
- <th rowspan="30"><strong>tabletserver</strong></th>
+ <th rowspan="28"><strong>tabletserver</strong></th>
<td rowspan="20">table</td>
<td>messagesInPerSecond</td>
<td>The number of messages written per second to this table.</td>
@@ -709,7 +748,7 @@ Some metrics might not be exposed when using other JVM
implementations (e.g. IBM
<td>Meter</td>
</tr>
<tr>
- <td rowspan="3">table_bucket_log</td>
+ <td rowspan="2">table_bucket_log</td>
<td>numSegments</td>
<td>The number of segments in local storage for this table bucket.</td>
<td>Gauge</td>
@@ -718,11 +757,6 @@ Some metrics might not be exposed when using other JVM
implementations (e.g. IBM
<td>endOffset</td>
<td>The end offset in local storage for this table bucket.</td>
<td>Gauge</td>
- </tr>
- <tr>
- <td>size</td>
- <td>The total log sizes in local storage for this table bucket.</td>
- <td>Gauge</td>
</tr>
<tr>
<td rowspan="3">table_bucket_remoteLog</td>
@@ -740,12 +774,6 @@ Some metrics might not be exposed when using other JVM
implementations (e.g. IBM
<td>The number of bytes written per second to this table.</td>
<td>Gauge</td>
</tr>
- <tr>
- <td rowspan="1">table_bucket_kv_snapshot</td>
- <td>latestSnapshotSize</td>
- <td>The latest kv snapshot size in bytes for this table bucket.</td>
- <td>Gauge</td>
- </tr>
</tbody>
</table>