This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 79fd2eb8a [lake/tiering] Add CoordinatorServer Monitoring Metrics for 
Lake Tiering Scheduling (#2815)
79fd2eb8a is described below

commit 79fd2eb8af2cafcd367a03a4be791614f2a6fb05
Author: Junbo Wang <[email protected]>
AuthorDate: Mon Mar 9 19:08:58 2026 +0800

    [lake/tiering] Add CoordinatorServer Monitoring Metrics for Lake Tiering 
Scheduling (#2815)
---
 .../java/org/apache/fluss/metrics/MetricNames.java |  4 ++
 .../server/coordinator/CoordinatorServer.java      |  5 +-
 .../coordinator/LakeTableTieringManager.java       | 68 ++++++++++++++++------
 .../metrics/group/LakeTieringMetricGroup.java      | 40 +++++++++++++
 .../coordinator/CoordinatorEventProcessorTest.java |  3 +-
 .../coordinator/LakeTableTieringManagerTest.java   | 42 ++++++++++++-
 .../rebalance/RebalanceManagerTest.java            |  3 +-
 .../statemachine/TableBucketStateMachineTest.java  |  3 +-
 .../server/metrics/group/TestingMetricGroups.java  |  3 +
 .../maintenance/observability/monitor-metrics.md   | 13 ++++-
 10 files changed, 159 insertions(+), 25 deletions(-)

diff --git 
a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java 
b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
index e1a5e28a6..1b1445665 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
@@ -59,6 +59,10 @@ public class MetricNames {
     public static final String KV_SNAPSHOT_LEASE_COUNT = 
"kvSnapshotLeaseCount";
     public static final String LEASED_KV_SNAPSHOT_COUNT = 
"leasedKvSnapshotCount";
 
+    // for lake tiering metrics - global level
+    public static final String LAKE_TIERING_PENDING_TABLES_COUNT = 
"pendingTablesCount";
+    public static final String LAKE_TIERING_RUNNING_TABLES_COUNT = 
"runningTablesCount";
+
     // 
--------------------------------------------------------------------------------------------
     // metrics for tablet server
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
index ca3708bfe..774c23cce 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
@@ -39,6 +39,7 @@ import 
org.apache.fluss.server.metadata.CoordinatorMetadataCache;
 import org.apache.fluss.server.metadata.ServerMetadataCache;
 import org.apache.fluss.server.metrics.ServerMetricUtils;
 import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
+import org.apache.fluss.server.metrics.group.LakeTieringMetricGroup;
 import org.apache.fluss.server.zk.ZooKeeperClient;
 import org.apache.fluss.server.zk.ZooKeeperUtils;
 import org.apache.fluss.server.zk.data.CoordinatorAddress;
@@ -200,7 +201,9 @@ public class CoordinatorServer extends ServerBase {
                 authorizer.startup();
             }
 
-            this.lakeTableTieringManager = new LakeTableTieringManager();
+            this.lakeTableTieringManager =
+                    new LakeTableTieringManager(
+                            new LakeTieringMetricGroup(metricRegistry, 
serverMetricGroup));
 
             MetadataManager metadataManager =
                     new MetadataManager(zkClient, conf, 
lakeCatalogDynamicLoader);
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java
index cb0c91634..6d412f5b7 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java
@@ -23,7 +23,9 @@ import org.apache.fluss.exception.FlussRuntimeException;
 import org.apache.fluss.exception.TableNotExistException;
 import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.metrics.MetricNames;
 import org.apache.fluss.server.entity.LakeTieringTableInfo;
+import org.apache.fluss.server.metrics.group.LakeTieringMetricGroup;
 import org.apache.fluss.server.utils.timer.DefaultTimer;
 import org.apache.fluss.server.utils.timer.Timer;
 import org.apache.fluss.server.utils.timer.TimerTask;
@@ -50,10 +52,11 @@ import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import static org.apache.fluss.utils.concurrent.LockUtils.inLock;
+import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
+import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
 
 /**
  * A manager to manage the tables to be tiered.
@@ -135,21 +138,25 @@ public class LakeTableTieringManager implements 
AutoCloseable {
     // table_id -> delayed tiering task
     private final Map<Long, DelayedTiering> delayedTieringByTableId;
 
-    private final Lock lock = new ReentrantLock();
+    private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
 
-    public LakeTableTieringManager() {
+    private final LakeTieringMetricGroup tieringMetricGroup;
+
+    public LakeTableTieringManager(LakeTieringMetricGroup 
lakeTieringMetricGroup) {
         this(
                 new DefaultTimer("delay lake tiering", 1_000, 20),
                 Executors.newSingleThreadScheduledExecutor(
                         new 
ExecutorThreadFactory("fluss-lake-tiering-timeout-checker")),
-                SystemClock.getInstance());
+                SystemClock.getInstance(),
+                lakeTieringMetricGroup);
     }
 
     @VisibleForTesting
     protected LakeTableTieringManager(
             Timer lakeTieringScheduleTimer,
             ScheduledExecutorService lakeTieringServiceTimeoutChecker,
-            Clock clock) {
+            Clock clock,
+            LakeTieringMetricGroup lakeTieringMetricGroup) {
         this.lakeTieringScheduleTimer = lakeTieringScheduleTimer;
         this.lakeTieringServiceTimeoutChecker = 
lakeTieringServiceTimeoutChecker;
         this.clock = clock;
@@ -165,10 +172,21 @@ public class LakeTableTieringManager implements 
AutoCloseable {
         this.tableTierEpoch = new HashMap<>();
         this.tableLastTieredTime = new HashMap<>();
         this.delayedTieringByTableId = new HashMap<>();
+        this.tieringMetricGroup = lakeTieringMetricGroup;
+        registerMetrics();
+    }
+
+    private void registerMetrics() {
+        tieringMetricGroup.gauge(
+                MetricNames.LAKE_TIERING_PENDING_TABLES_COUNT,
+                () -> inReadLock(lock, pendingTieringTables::size));
+        tieringMetricGroup.gauge(
+                MetricNames.LAKE_TIERING_RUNNING_TABLES_COUNT,
+                () -> inReadLock(lock, liveTieringTableIds::size));
     }
 
     public void initWithLakeTables(List<Tuple2<TableInfo, Long>> 
tableInfoWithTieredTime) {
-        inLock(
+        inWriteLock(
                 lock,
                 () -> {
                     for (Tuple2<TableInfo, Long> tableInfoAndLastLakeTime :
@@ -184,7 +202,7 @@ public class LakeTableTieringManager implements 
AutoCloseable {
     }
 
     public void addNewLakeTable(TableInfo tableInfo) {
-        inLock(
+        inWriteLock(
                 lock,
                 () -> {
                     registerLakeTable(tableInfo, clock.milliseconds());
@@ -225,7 +243,7 @@ public class LakeTableTieringManager implements 
AutoCloseable {
     }
 
     public void removeLakeTable(long tableId) {
-        inLock(
+        inWriteLock(
                 lock,
                 () -> {
                     tablePaths.remove(tableId);
@@ -250,7 +268,7 @@ public class LakeTableTieringManager implements 
AutoCloseable {
      * @param newFreshnessMs the new freshness interval in milliseconds
      */
     public void updateTableLakeFreshness(long tableId, long newFreshnessMs) {
-        inLock(
+        inWriteLock(
                 lock,
                 () -> {
                     Long currentFreshness = tableLakeFreshness.get(tableId);
@@ -286,7 +304,7 @@ public class LakeTableTieringManager implements 
AutoCloseable {
 
     @VisibleForTesting
     protected void checkTieringServiceTimeout() {
-        inLock(
+        inWriteLock(
                 lock,
                 () -> {
                     long currentTime = clock.milliseconds();
@@ -313,11 +331,11 @@ public class LakeTableTieringManager implements 
AutoCloseable {
 
     @Nullable
     public LakeTieringTableInfo requestTable() {
-        return inLock(
+        return inWriteLock(
                 lock,
                 () -> {
                     Long tableId = pendingTieringTables.poll();
-                    // no any pending table, return directly
+                    // now no any pending table, return directly
                     if (tableId == null) {
                         return null;
                     }
@@ -333,7 +351,7 @@ public class LakeTableTieringManager implements 
AutoCloseable {
     }
 
     public void finishTableTiering(long tableId, long tieredEpoch, boolean 
isForceFinished) {
-        inLock(
+        inWriteLock(
                 lock,
                 () -> {
                     validateTieringServiceRequest(tableId, tieredEpoch);
@@ -350,7 +368,7 @@ public class LakeTableTieringManager implements 
AutoCloseable {
     }
 
     public void reportTieringFail(long tableId, long tieringEpoch) {
-        inLock(
+        inWriteLock(
                 lock,
                 () -> {
                     validateTieringServiceRequest(tableId, tieringEpoch);
@@ -362,7 +380,7 @@ public class LakeTableTieringManager implements 
AutoCloseable {
     }
 
     public void renewTieringHeartbeat(long tableId, long tieringEpoch) {
-        inLock(
+        inWriteLock(
                 lock,
                 () -> {
                     validateTieringServiceRequest(tableId, tieringEpoch);
@@ -518,6 +536,7 @@ public class LakeTableTieringManager implements 
AutoCloseable {
         }
 
         lakeTieringScheduleTimer.shutdown();
+        tieringMetricGroup.close();
     }
 
     private class DelayedTiering extends TimerTask {
@@ -531,7 +550,7 @@ public class LakeTableTieringManager implements 
AutoCloseable {
 
         @Override
         public void run() {
-            inLock(
+            inWriteLock(
                     lock,
                     () -> {
                         // to pending state
@@ -558,7 +577,8 @@ public class LakeTableTieringManager implements 
AutoCloseable {
         }
     }
 
-    private enum TieringState {
+    @VisibleForTesting
+    enum TieringState {
         // When a new lake table is created, the state will be New
         New {
             @Override
@@ -615,4 +635,14 @@ public class LakeTableTieringManager implements 
AutoCloseable {
 
         abstract Set<TieringState> validPreviousStates();
     }
+
+    @VisibleForTesting
+    protected int getPendingTablesCount() {
+        return inReadLock(lock, pendingTieringTables::size);
+    }
+
+    @VisibleForTesting
+    protected int getRunningTablesCount() {
+        return inReadLock(lock, liveTieringTableIds::size);
+    }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/LakeTieringMetricGroup.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/LakeTieringMetricGroup.java
new file mode 100644
index 000000000..bc59b11ff
--- /dev/null
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/LakeTieringMetricGroup.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.metrics.group;
+
+import org.apache.fluss.metrics.CharacterFilter;
+import org.apache.fluss.metrics.groups.AbstractMetricGroup;
+import org.apache.fluss.metrics.registry.MetricRegistry;
+
+import static org.apache.fluss.metrics.utils.MetricGroupUtils.makeScope;
+
+/** Metrics for lake tiering. */
+public class LakeTieringMetricGroup extends AbstractMetricGroup {
+
+    private static final String NAME = "lakeTiering";
+
+    public LakeTieringMetricGroup(MetricRegistry registry, 
CoordinatorMetricGroup parent) {
+        super(registry, makeScope(parent, NAME), parent);
+    }
+
+    @Override
+    protected String getGroupName(CharacterFilter filter) {
+        return NAME;
+    }
+}
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
index 1f19d4da7..74fd01c5e 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
@@ -194,7 +194,8 @@ class CoordinatorEventProcessorTest {
         testCoordinatorChannelManager = new TestCoordinatorChannelManager();
         autoPartitionManager =
                 new AutoPartitionManager(serverMetadataCache, metadataManager, 
new Configuration());
-        lakeTableTieringManager = new LakeTableTieringManager();
+        lakeTableTieringManager =
+                new 
LakeTableTieringManager(TestingMetricGroups.LAKE_TIERING_METRICS);
         Configuration conf = new Configuration();
         String remoteDataDir = "/tmp/fluss/remote-data";
         conf.setString(ConfigOptions.REMOTE_DATA_DIR, remoteDataDir);
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableTieringManagerTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableTieringManagerTest.java
index 80b589dc0..f0ab38f78 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableTieringManagerTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableTieringManagerTest.java
@@ -25,6 +25,7 @@ import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.server.entity.LakeTieringTableInfo;
+import org.apache.fluss.server.metrics.group.TestingMetricGroups;
 import org.apache.fluss.server.utils.timer.DefaultTimer;
 import 
org.apache.fluss.testutils.common.ManuallyTriggeredScheduledExecutorService;
 import org.apache.fluss.types.DataTypes;
@@ -62,7 +63,8 @@ class LakeTableTieringManagerTest {
         return new LakeTableTieringManager(
                 new DefaultTimer("delay lake tiering", 1_000, 20, manualClock),
                 lakeTieringServiceTimeoutChecker,
-                manualClock);
+                manualClock,
+                TestingMetricGroups.LAKE_TIERING_METRICS);
     }
 
     @Test
@@ -238,6 +240,44 @@ class LakeTableTieringManagerTest {
         assertRequestTable(tableId1, tablePath1, 2);
     }
 
+    @Test
+    void testGlobalMetrics() throws Exception {
+        // Initially no tables - verify counts are 0
+        assertThat(tableTieringManager.getPendingTablesCount()).isEqualTo(0);
+        assertThat(tableTieringManager.getRunningTablesCount()).isEqualTo(0);
+
+        // Add a table
+        long tableId1 = 1L;
+        TablePath tablePath1 = TablePath.of("db", "table1");
+        TableInfo tableInfo1 = createTableInfo(tableId1, tablePath1, 
Duration.ofSeconds(10));
+        tableTieringManager.addNewLakeTable(tableInfo1);
+
+        // Advance time to make it pending - need to wait for timer to trigger
+        manualClock.advanceTime(Duration.ofSeconds(10));
+
+        // Wait for the delayed task to execute and move to pending
+        waitValue(
+                () ->
+                        tableTieringManager.getPendingTablesCount() == 1
+                                ? Optional.of(1)
+                                : Optional.empty(),
+                Duration.ofSeconds(5),
+                "Table should be in pending state");
+
+        assertThat(tableTieringManager.getPendingTablesCount()).isEqualTo(1);
+        assertThat(tableTieringManager.getRunningTablesCount()).isEqualTo(0);
+
+        // Request table - should transition to tiering
+        assertRequestTable(tableId1, tablePath1, 1);
+        assertThat(tableTieringManager.getPendingTablesCount()).isEqualTo(0);
+        assertThat(tableTieringManager.getRunningTablesCount()).isEqualTo(1);
+
+        // Report failure
+        tableTieringManager.reportTieringFail(tableId1, 1);
+        assertThat(tableTieringManager.getRunningTablesCount()).isEqualTo(0);
+        assertThat(tableTieringManager.getPendingTablesCount()).isEqualTo(1); 
// back to pending
+    }
+
     @Test
     void testForceFinishTableTieringImmediatelyRePending() {
         long tableId1 = 1L;
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java
index 43ed3ae78..a961dc393 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java
@@ -93,7 +93,8 @@ public class RebalanceManagerTest {
 
         autoPartitionManager =
                 new AutoPartitionManager(serverMetadataCache, metadataManager, 
new Configuration());
-        lakeTableTieringManager = new LakeTableTieringManager();
+        lakeTableTieringManager =
+                new 
LakeTableTieringManager(TestingMetricGroups.LAKE_TIERING_METRICS);
         CoordinatorEventProcessor eventProcessor = 
buildCoordinatorEventProcessor();
         rebalanceManager = new RebalanceManager(eventProcessor, 
zookeeperClient);
         rebalanceManager.startup();
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java
index 26dfeb8dc..b905ff969 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java
@@ -122,7 +122,8 @@ class TableBucketStateMachineTest {
                                 new Configuration(),
                                 new LakeCatalogDynamicLoader(new 
Configuration(), null, true)),
                         new Configuration());
-        lakeTableTieringManager = new LakeTableTieringManager();
+        lakeTableTieringManager =
+                new 
LakeTableTieringManager(TestingMetricGroups.LAKE_TIERING_METRICS);
 
         kvSnapshotLeaseManager =
                 new KvSnapshotLeaseManager(
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/metrics/group/TestingMetricGroups.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/metrics/group/TestingMetricGroups.java
index b2e93f2bb..3bc6158df 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/metrics/group/TestingMetricGroups.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/metrics/group/TestingMetricGroups.java
@@ -34,6 +34,9 @@ public class TestingMetricGroups {
     public static final CoordinatorMetricGroup COORDINATOR_METRICS =
             new CoordinatorMetricGroup(NOPMetricRegistry.INSTANCE, "cluster1", 
"host", "0");
 
+    public static final LakeTieringMetricGroup LAKE_TIERING_METRICS =
+            new LakeTieringMetricGroup(NOPMetricRegistry.INSTANCE, 
COORDINATOR_METRICS);
+
     public static final TableMetricGroup TABLE_METRICS =
             new TableMetricGroup(
                     NOPMetricRegistry.INSTANCE,
diff --git a/website/docs/maintenance/observability/monitor-metrics.md 
b/website/docs/maintenance/observability/monitor-metrics.md
index 2ed317079..8bca85f66 100644
--- a/website/docs/maintenance/observability/monitor-metrics.md
+++ b/website/docs/maintenance/observability/monitor-metrics.md
@@ -294,7 +294,7 @@ Some metrics might not be exposed when using other JVM 
implementations (e.g. IBM
   </thead>
   <tbody>
     <tr>
-      <th rowspan="17"><strong>coordinator</strong></th>
+       <th rowspan="19"><strong>coordinator</strong></th>
       <td style={{textAlign: 'center', verticalAlign: 'middle' }} 
rowspan="10">-</td>
       <td>activeCoordinatorCount</td>
       <td>The number of active CoordinatorServer in this cluster.</td>
@@ -384,6 +384,17 @@ Some metrics might not be exposed when using other JVM 
implementations (e.g. IBM
       <td>All kv snapshot size of each table bucket.</td>
       <td>Gauge</td>
     </tr>
+    <tr>
+      <td rowspan="2">lakeTiering</td>
+      <td>pendingTablesCount</td>
+      <td>The number of tables waiting to be tiered.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>runningTablesCount</td>
+      <td>The number of tables currently being tiered.</td>
+      <td>Gauge</td>
+    </tr>
   </tbody>
 </table>
 

Reply via email to