This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 7ffcbf052 [lake] Introduce lake lag metrics for datalake enabled table 
(#1729)
7ffcbf052 is described below

commit 7ffcbf0521e4d16c5035f8af1d1e92aee51531c7
Author: CaoZhen <[email protected]>
AuthorDate: Sat Sep 27 23:19:23 2025 +0800

    [lake] Introduce lake lag metrics for datalake enabled table (#1729)
---
 .../java/org/apache/fluss/metrics/MetricNames.java |  2 ++
 .../org/apache/fluss/server/replica/Replica.java   | 25 ++++++++++++++++++++++
 .../maintenance/observability/monitor-metrics.md   | 11 ++++++++++
 3 files changed, 38 insertions(+)

diff --git 
a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java 
b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
index 6cb4e72e1..be8d644e7 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
@@ -133,6 +133,8 @@ public class MetricNames {
     public static final String LOG_NUM_SEGMENTS = "numSegments";
     public static final String LOG_END_OFFSET = "endOffset";
     public static final String REMOTE_LOG_SIZE = "size";
+    public static final String LOG_LAKE_PENDING_RECORDS = "pendingRecords";
+    public static final String LOG_LAKE_TIMESTAMP_LAG = "timestampLag";
 
     // for logic storage
     public static final String LOCAL_STORAGE_LOG_SIZE = "logSize";
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
index 091673085..3b9b11c47 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
@@ -201,6 +201,8 @@ public final class Replica {
     private Counter isrExpands;
     private Counter failedIsrUpdates;
 
+    private MetricGroup lakeTieringMetricGroup;
+
     public Replica(
             PhysicalTablePath physicalPath,
             TableBucket tableBucket,
@@ -536,6 +538,10 @@ public final class Replica {
     private void onBecomeNewLeader() {
         updateLeaderEndOffsetSnapshot();
 
+        if (isDataLakeEnabled()) {
+            registerLakeTieringMetrics();
+        }
+
         if (isKvTable()) {
             // if it's become new leader, we must
             // first destroy the old kv tablet
@@ -546,11 +552,30 @@ public final class Replica {
         }
     }
 
+    private void registerLakeTieringMetrics() {
+        lakeTieringMetricGroup = bucketMetricGroup.addGroup("lakeTiering");
+        lakeTieringMetricGroup.gauge(
+                MetricNames.LOG_LAKE_PENDING_RECORDS,
+                () ->
+                        getLakeLogEndOffset() < 0L
+                                ? -1
+                                : getLogHighWatermark() - 
getLakeLogEndOffset());
+        lakeTieringMetricGroup.gauge(
+                MetricNames.LOG_LAKE_TIMESTAMP_LAG,
+                () ->
+                        logTablet.getLakeMaxTimestamp() < 0L
+                                ? -1
+                                : logTablet.localMaxTimestamp() - 
logTablet.getLakeMaxTimestamp());
+    }
+
     private void onBecomeNewFollower() {
         if (isKvTable()) {
             // it should be from leader to follower, we need to destroy the kv 
tablet
             dropKv();
         }
+        if (lakeTieringMetricGroup != null) {
+            lakeTieringMetricGroup.close();
+        }
     }
 
     @VisibleForTesting
diff --git a/website/docs/maintenance/observability/monitor-metrics.md 
b/website/docs/maintenance/observability/monitor-metrics.md
index 95e1fec4a..6f2eb5156 100644
--- a/website/docs/maintenance/observability/monitor-metrics.md
+++ b/website/docs/maintenance/observability/monitor-metrics.md
@@ -762,6 +762,17 @@ Some metrics might not be exposed when using other JVM 
implementations (e.g. IBM
       <td>endOffset</td>
       <td>The end offset in local storage for this table bucket.</td>
       <td>Gauge</td>
+    </tr>
+     <tr>
+      <td rowspan="2">table_bucket_lakeTiering</td>
+      <td>pendingRecords</td>
+      <td>The number of records lag between local log and remote log for this 
table bucket.</td>
+      <td>Gauge</td>
+    </tr>
+     <tr>
+      <td>timestampLag</td>
+      <td>The timestamp lag between local log and remote log for this table 
bucket in milliseconds.</td>
+      <td>Gauge</td>
     </tr>
     <tr>
       <td rowspan="3">table_bucket_remoteLog</td>

Reply via email to