This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 79fd2eb8a [lake/tiering] Add CoordinatorServer Monitoring Metrics for
Lake Tiering Scheduling (#2815)
79fd2eb8a is described below
commit 79fd2eb8af2cafcd367a03a4be791614f2a6fb05
Author: Junbo Wang <[email protected]>
AuthorDate: Mon Mar 9 19:08:58 2026 +0800
[lake/tiering] Add CoordinatorServer Monitoring Metrics for Lake Tiering
Scheduling (#2815)
---
.../java/org/apache/fluss/metrics/MetricNames.java | 4 ++
.../server/coordinator/CoordinatorServer.java | 5 +-
.../coordinator/LakeTableTieringManager.java | 68 ++++++++++++++++------
.../metrics/group/LakeTieringMetricGroup.java | 40 +++++++++++++
.../coordinator/CoordinatorEventProcessorTest.java | 3 +-
.../coordinator/LakeTableTieringManagerTest.java | 42 ++++++++++++-
.../rebalance/RebalanceManagerTest.java | 3 +-
.../statemachine/TableBucketStateMachineTest.java | 3 +-
.../server/metrics/group/TestingMetricGroups.java | 3 +
.../maintenance/observability/monitor-metrics.md | 13 ++++-
10 files changed, 159 insertions(+), 25 deletions(-)
diff --git
a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
index e1a5e28a6..1b1445665 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
@@ -59,6 +59,10 @@ public class MetricNames {
public static final String KV_SNAPSHOT_LEASE_COUNT =
"kvSnapshotLeaseCount";
public static final String LEASED_KV_SNAPSHOT_COUNT =
"leasedKvSnapshotCount";
+ // for lake tiering metrics - global level
+ public static final String LAKE_TIERING_PENDING_TABLES_COUNT =
"pendingTablesCount";
+ public static final String LAKE_TIERING_RUNNING_TABLES_COUNT =
"runningTablesCount";
+
//
--------------------------------------------------------------------------------------------
// metrics for tablet server
//
--------------------------------------------------------------------------------------------
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
index ca3708bfe..774c23cce 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
@@ -39,6 +39,7 @@ import
org.apache.fluss.server.metadata.CoordinatorMetadataCache;
import org.apache.fluss.server.metadata.ServerMetadataCache;
import org.apache.fluss.server.metrics.ServerMetricUtils;
import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
+import org.apache.fluss.server.metrics.group.LakeTieringMetricGroup;
import org.apache.fluss.server.zk.ZooKeeperClient;
import org.apache.fluss.server.zk.ZooKeeperUtils;
import org.apache.fluss.server.zk.data.CoordinatorAddress;
@@ -200,7 +201,9 @@ public class CoordinatorServer extends ServerBase {
authorizer.startup();
}
- this.lakeTableTieringManager = new LakeTableTieringManager();
+ this.lakeTableTieringManager =
+ new LakeTableTieringManager(
+ new LakeTieringMetricGroup(metricRegistry,
serverMetricGroup));
MetadataManager metadataManager =
new MetadataManager(zkClient, conf,
lakeCatalogDynamicLoader);
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java
index cb0c91634..6d412f5b7 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java
@@ -23,7 +23,9 @@ import org.apache.fluss.exception.FlussRuntimeException;
import org.apache.fluss.exception.TableNotExistException;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.metrics.MetricNames;
import org.apache.fluss.server.entity.LakeTieringTableInfo;
+import org.apache.fluss.server.metrics.group.LakeTieringMetricGroup;
import org.apache.fluss.server.utils.timer.DefaultTimer;
import org.apache.fluss.server.utils.timer.Timer;
import org.apache.fluss.server.utils.timer.TimerTask;
@@ -50,10 +52,11 @@ import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
-import static org.apache.fluss.utils.concurrent.LockUtils.inLock;
+import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
+import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
/**
* A manager to manage the tables to be tiered.
@@ -135,21 +138,25 @@ public class LakeTableTieringManager implements
AutoCloseable {
// table_id -> delayed tiering task
private final Map<Long, DelayedTiering> delayedTieringByTableId;
- private final Lock lock = new ReentrantLock();
+ private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
- public LakeTableTieringManager() {
+ private final LakeTieringMetricGroup tieringMetricGroup;
+
+ public LakeTableTieringManager(LakeTieringMetricGroup
lakeTieringMetricGroup) {
this(
new DefaultTimer("delay lake tiering", 1_000, 20),
Executors.newSingleThreadScheduledExecutor(
new
ExecutorThreadFactory("fluss-lake-tiering-timeout-checker")),
- SystemClock.getInstance());
+ SystemClock.getInstance(),
+ lakeTieringMetricGroup);
}
@VisibleForTesting
protected LakeTableTieringManager(
Timer lakeTieringScheduleTimer,
ScheduledExecutorService lakeTieringServiceTimeoutChecker,
- Clock clock) {
+ Clock clock,
+ LakeTieringMetricGroup lakeTieringMetricGroup) {
this.lakeTieringScheduleTimer = lakeTieringScheduleTimer;
this.lakeTieringServiceTimeoutChecker =
lakeTieringServiceTimeoutChecker;
this.clock = clock;
@@ -165,10 +172,21 @@ public class LakeTableTieringManager implements
AutoCloseable {
this.tableTierEpoch = new HashMap<>();
this.tableLastTieredTime = new HashMap<>();
this.delayedTieringByTableId = new HashMap<>();
+ this.tieringMetricGroup = lakeTieringMetricGroup;
+ registerMetrics();
+ }
+
+ private void registerMetrics() {
+ tieringMetricGroup.gauge(
+ MetricNames.LAKE_TIERING_PENDING_TABLES_COUNT,
+ () -> inReadLock(lock, pendingTieringTables::size));
+ tieringMetricGroup.gauge(
+ MetricNames.LAKE_TIERING_RUNNING_TABLES_COUNT,
+ () -> inReadLock(lock, liveTieringTableIds::size));
}
public void initWithLakeTables(List<Tuple2<TableInfo, Long>>
tableInfoWithTieredTime) {
- inLock(
+ inWriteLock(
lock,
() -> {
for (Tuple2<TableInfo, Long> tableInfoAndLastLakeTime :
@@ -184,7 +202,7 @@ public class LakeTableTieringManager implements
AutoCloseable {
}
public void addNewLakeTable(TableInfo tableInfo) {
- inLock(
+ inWriteLock(
lock,
() -> {
registerLakeTable(tableInfo, clock.milliseconds());
@@ -225,7 +243,7 @@ public class LakeTableTieringManager implements
AutoCloseable {
}
public void removeLakeTable(long tableId) {
- inLock(
+ inWriteLock(
lock,
() -> {
tablePaths.remove(tableId);
@@ -250,7 +268,7 @@ public class LakeTableTieringManager implements
AutoCloseable {
* @param newFreshnessMs the new freshness interval in milliseconds
*/
public void updateTableLakeFreshness(long tableId, long newFreshnessMs) {
- inLock(
+ inWriteLock(
lock,
() -> {
Long currentFreshness = tableLakeFreshness.get(tableId);
@@ -286,7 +304,7 @@ public class LakeTableTieringManager implements
AutoCloseable {
@VisibleForTesting
protected void checkTieringServiceTimeout() {
- inLock(
+ inWriteLock(
lock,
() -> {
long currentTime = clock.milliseconds();
@@ -313,11 +331,11 @@ public class LakeTableTieringManager implements
AutoCloseable {
@Nullable
public LakeTieringTableInfo requestTable() {
- return inLock(
+ return inWriteLock(
lock,
() -> {
Long tableId = pendingTieringTables.poll();
- // no any pending table, return directly
+ // now no any pending table, return directly
if (tableId == null) {
return null;
}
@@ -333,7 +351,7 @@ public class LakeTableTieringManager implements
AutoCloseable {
}
public void finishTableTiering(long tableId, long tieredEpoch, boolean
isForceFinished) {
- inLock(
+ inWriteLock(
lock,
() -> {
validateTieringServiceRequest(tableId, tieredEpoch);
@@ -350,7 +368,7 @@ public class LakeTableTieringManager implements
AutoCloseable {
}
public void reportTieringFail(long tableId, long tieringEpoch) {
- inLock(
+ inWriteLock(
lock,
() -> {
validateTieringServiceRequest(tableId, tieringEpoch);
@@ -362,7 +380,7 @@ public class LakeTableTieringManager implements
AutoCloseable {
}
public void renewTieringHeartbeat(long tableId, long tieringEpoch) {
- inLock(
+ inWriteLock(
lock,
() -> {
validateTieringServiceRequest(tableId, tieringEpoch);
@@ -518,6 +536,7 @@ public class LakeTableTieringManager implements
AutoCloseable {
}
lakeTieringScheduleTimer.shutdown();
+ tieringMetricGroup.close();
}
private class DelayedTiering extends TimerTask {
@@ -531,7 +550,7 @@ public class LakeTableTieringManager implements
AutoCloseable {
@Override
public void run() {
- inLock(
+ inWriteLock(
lock,
() -> {
// to pending state
@@ -558,7 +577,8 @@ public class LakeTableTieringManager implements
AutoCloseable {
}
}
- private enum TieringState {
+ @VisibleForTesting
+ enum TieringState {
// When a new lake table is created, the state will be New
New {
@Override
@@ -615,4 +635,14 @@ public class LakeTableTieringManager implements
AutoCloseable {
abstract Set<TieringState> validPreviousStates();
}
+
+ @VisibleForTesting
+ protected int getPendingTablesCount() {
+ return inReadLock(lock, pendingTieringTables::size);
+ }
+
+ @VisibleForTesting
+ protected int getRunningTablesCount() {
+ return inReadLock(lock, liveTieringTableIds::size);
+ }
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/LakeTieringMetricGroup.java
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/LakeTieringMetricGroup.java
new file mode 100644
index 000000000..bc59b11ff
--- /dev/null
+++
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/LakeTieringMetricGroup.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.metrics.group;
+
+import org.apache.fluss.metrics.CharacterFilter;
+import org.apache.fluss.metrics.groups.AbstractMetricGroup;
+import org.apache.fluss.metrics.registry.MetricRegistry;
+
+import static org.apache.fluss.metrics.utils.MetricGroupUtils.makeScope;
+
+/** Metrics for lake tiering. */
+public class LakeTieringMetricGroup extends AbstractMetricGroup {
+
+ private static final String NAME = "lakeTiering";
+
+ public LakeTieringMetricGroup(MetricRegistry registry,
CoordinatorMetricGroup parent) {
+ super(registry, makeScope(parent, NAME), parent);
+ }
+
+ @Override
+ protected String getGroupName(CharacterFilter filter) {
+ return NAME;
+ }
+}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
index 1f19d4da7..74fd01c5e 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
@@ -194,7 +194,8 @@ class CoordinatorEventProcessorTest {
testCoordinatorChannelManager = new TestCoordinatorChannelManager();
autoPartitionManager =
new AutoPartitionManager(serverMetadataCache, metadataManager,
new Configuration());
- lakeTableTieringManager = new LakeTableTieringManager();
+ lakeTableTieringManager =
+ new
LakeTableTieringManager(TestingMetricGroups.LAKE_TIERING_METRICS);
Configuration conf = new Configuration();
String remoteDataDir = "/tmp/fluss/remote-data";
conf.setString(ConfigOptions.REMOTE_DATA_DIR, remoteDataDir);
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableTieringManagerTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableTieringManagerTest.java
index 80b589dc0..f0ab38f78 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableTieringManagerTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableTieringManagerTest.java
@@ -25,6 +25,7 @@ import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.server.entity.LakeTieringTableInfo;
+import org.apache.fluss.server.metrics.group.TestingMetricGroups;
import org.apache.fluss.server.utils.timer.DefaultTimer;
import
org.apache.fluss.testutils.common.ManuallyTriggeredScheduledExecutorService;
import org.apache.fluss.types.DataTypes;
@@ -62,7 +63,8 @@ class LakeTableTieringManagerTest {
return new LakeTableTieringManager(
new DefaultTimer("delay lake tiering", 1_000, 20, manualClock),
lakeTieringServiceTimeoutChecker,
- manualClock);
+ manualClock,
+ TestingMetricGroups.LAKE_TIERING_METRICS);
}
@Test
@@ -238,6 +240,44 @@ class LakeTableTieringManagerTest {
assertRequestTable(tableId1, tablePath1, 2);
}
+ @Test
+ void testGlobalMetrics() throws Exception {
+ // Initially no tables - verify counts are 0
+ assertThat(tableTieringManager.getPendingTablesCount()).isEqualTo(0);
+ assertThat(tableTieringManager.getRunningTablesCount()).isEqualTo(0);
+
+ // Add a table
+ long tableId1 = 1L;
+ TablePath tablePath1 = TablePath.of("db", "table1");
+ TableInfo tableInfo1 = createTableInfo(tableId1, tablePath1,
Duration.ofSeconds(10));
+ tableTieringManager.addNewLakeTable(tableInfo1);
+
+ // Advance time to make it pending - need to wait for timer to trigger
+ manualClock.advanceTime(Duration.ofSeconds(10));
+
+ // Wait for the delayed task to execute and move to pending
+ waitValue(
+ () ->
+ tableTieringManager.getPendingTablesCount() == 1
+ ? Optional.of(1)
+ : Optional.empty(),
+ Duration.ofSeconds(5),
+ "Table should be in pending state");
+
+ assertThat(tableTieringManager.getPendingTablesCount()).isEqualTo(1);
+ assertThat(tableTieringManager.getRunningTablesCount()).isEqualTo(0);
+
+ // Request table - should transition to tiering
+ assertRequestTable(tableId1, tablePath1, 1);
+ assertThat(tableTieringManager.getPendingTablesCount()).isEqualTo(0);
+ assertThat(tableTieringManager.getRunningTablesCount()).isEqualTo(1);
+
+ // Report failure
+ tableTieringManager.reportTieringFail(tableId1, 1);
+ assertThat(tableTieringManager.getRunningTablesCount()).isEqualTo(0);
+ assertThat(tableTieringManager.getPendingTablesCount()).isEqualTo(1);
// back to pending
+ }
+
@Test
void testForceFinishTableTieringImmediatelyRePending() {
long tableId1 = 1L;
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java
index 43ed3ae78..a961dc393 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java
@@ -93,7 +93,8 @@ public class RebalanceManagerTest {
autoPartitionManager =
new AutoPartitionManager(serverMetadataCache, metadataManager,
new Configuration());
- lakeTableTieringManager = new LakeTableTieringManager();
+ lakeTableTieringManager =
+ new
LakeTableTieringManager(TestingMetricGroups.LAKE_TIERING_METRICS);
CoordinatorEventProcessor eventProcessor =
buildCoordinatorEventProcessor();
rebalanceManager = new RebalanceManager(eventProcessor,
zookeeperClient);
rebalanceManager.startup();
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java
index 26dfeb8dc..b905ff969 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java
@@ -122,7 +122,8 @@ class TableBucketStateMachineTest {
new Configuration(),
new LakeCatalogDynamicLoader(new
Configuration(), null, true)),
new Configuration());
- lakeTableTieringManager = new LakeTableTieringManager();
+ lakeTableTieringManager =
+ new
LakeTableTieringManager(TestingMetricGroups.LAKE_TIERING_METRICS);
kvSnapshotLeaseManager =
new KvSnapshotLeaseManager(
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/metrics/group/TestingMetricGroups.java
b/fluss-server/src/test/java/org/apache/fluss/server/metrics/group/TestingMetricGroups.java
index b2e93f2bb..3bc6158df 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/metrics/group/TestingMetricGroups.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/metrics/group/TestingMetricGroups.java
@@ -34,6 +34,9 @@ public class TestingMetricGroups {
public static final CoordinatorMetricGroup COORDINATOR_METRICS =
new CoordinatorMetricGroup(NOPMetricRegistry.INSTANCE, "cluster1",
"host", "0");
+ public static final LakeTieringMetricGroup LAKE_TIERING_METRICS =
+ new LakeTieringMetricGroup(NOPMetricRegistry.INSTANCE,
COORDINATOR_METRICS);
+
public static final TableMetricGroup TABLE_METRICS =
new TableMetricGroup(
NOPMetricRegistry.INSTANCE,
diff --git a/website/docs/maintenance/observability/monitor-metrics.md
b/website/docs/maintenance/observability/monitor-metrics.md
index 2ed317079..8bca85f66 100644
--- a/website/docs/maintenance/observability/monitor-metrics.md
+++ b/website/docs/maintenance/observability/monitor-metrics.md
@@ -294,7 +294,7 @@ Some metrics might not be exposed when using other JVM
implementations (e.g. IBM
</thead>
<tbody>
<tr>
- <th rowspan="17"><strong>coordinator</strong></th>
+ <th rowspan="19"><strong>coordinator</strong></th>
<td style={{textAlign: 'center', verticalAlign: 'middle' }}
rowspan="10">-</td>
<td>activeCoordinatorCount</td>
<td>The number of active CoordinatorServer in this cluster.</td>
@@ -384,6 +384,17 @@ Some metrics might not be exposed when using other JVM
implementations (e.g. IBM
<td>All kv snapshot size of each table bucket.</td>
<td>Gauge</td>
</tr>
+ <tr>
+ <td rowspan="2">lakeTiering</td>
+ <td>pendingTablesCount</td>
+ <td>The number of tables waiting to be tiered.</td>
+ <td>Gauge</td>
+ </tr>
+ <tr>
+ <td>runningTablesCount</td>
+ <td>The number of tables currently being tiered.</td>
+ <td>Gauge</td>
+ </tr>
</tbody>
</table>