This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new f7fa97169 [lake/tiering] Add per-table monitoring metrics for Lake
Tiering (#2454)
f7fa97169 is described below
commit f7fa971694cf9fe81c9ede5f74302952e4dcd70b
Author: Junbo Wang <[email protected]>
AuthorDate: Fri Mar 13 20:36:49 2026 +0800
[lake/tiering] Add per-table monitoring metrics for Lake Tiering (#2454)
---
.../fluss/lake/committer/LakeCommitResult.java | 50 ++++-
.../apache/fluss/lake/committer/TieringStats.java | 77 ++++++++
.../java/org/apache/fluss/metrics/MetricNames.java | 7 +
.../tiering/committer/TieringCommitOperator.java | 65 ++++---
.../flink/tiering/event/FinishedTieringEvent.java | 19 +-
.../source/enumerator/TieringSourceEnumerator.java | 49 +++--
.../lake/iceberg/tiering/IcebergLakeCommitter.java | 1 +
.../lake/lance/tiering/LanceLakeCommitter.java | 1 +
.../lake/paimon/tiering/PaimonLakeCommitter.java | 40 +++-
fluss-rpc/src/main/proto/FlussApi.proto | 10 +
.../server/coordinator/CoordinatorService.java | 13 +-
.../coordinator/LakeTableTieringManager.java | 206 +++++++++++++++++++--
.../metrics/group/LakeTieringMetricGroup.java | 56 ++++++
.../coordinator/LakeTableTieringManagerTest.java | 104 ++++++++++-
.../maintenance/observability/monitor-metrics.md | 28 ++-
15 files changed, 662 insertions(+), 64 deletions(-)
diff --git
a/fluss-common/src/main/java/org/apache/fluss/lake/committer/LakeCommitResult.java
b/fluss-common/src/main/java/org/apache/fluss/lake/committer/LakeCommitResult.java
index 88005f62f..737079d52 100644
---
a/fluss-common/src/main/java/org/apache/fluss/lake/committer/LakeCommitResult.java
+++
b/fluss-common/src/main/java/org/apache/fluss/lake/committer/LakeCommitResult.java
@@ -61,23 +61,38 @@ public class LakeCommitResult {
// 2: committedIsReadable is true, committed snapshot is just also readable
@Nullable private final ReadableSnapshot readableSnapshot;
+ @Nullable private final TieringStats tieringStats;
+
private LakeCommitResult(
long committedSnapshotId,
boolean committedIsReadable,
@Nullable ReadableSnapshot readableSnapshot,
- @Nullable Long earliestSnapshotIDToKeep) {
+ @Nullable Long earliestSnapshotIDToKeep,
+ @Nullable TieringStats tieringStats) {
this.committedSnapshotId = committedSnapshotId;
this.committedIsReadable = committedIsReadable;
this.readableSnapshot = readableSnapshot;
this.earliestSnapshotIDToKeep = earliestSnapshotIDToKeep;
+ this.tieringStats = tieringStats;
}
public static LakeCommitResult committedIsReadable(long
committedSnapshotId) {
- return new LakeCommitResult(committedSnapshotId, true, null,
KEEP_LATEST);
+ return committedIsReadable(committedSnapshotId, null);
+ }
+
+ public static LakeCommitResult committedIsReadable(
+ long committedSnapshotId, @Nullable TieringStats tieringStats) {
+ return new LakeCommitResult(committedSnapshotId, true, null,
KEEP_LATEST, tieringStats);
}
public static LakeCommitResult unknownReadableSnapshot(long
committedSnapshotId) {
- return new LakeCommitResult(committedSnapshotId, false, null,
KEEP_ALL_PREVIOUS);
+ return unknownReadableSnapshot(committedSnapshotId, null);
+ }
+
+ public static LakeCommitResult unknownReadableSnapshot(
+ long committedSnapshotId, @Nullable TieringStats tieringStats) {
+ return new LakeCommitResult(
+ committedSnapshotId, false, null, KEEP_ALL_PREVIOUS,
tieringStats);
}
public static LakeCommitResult withReadableSnapshot(
@@ -89,12 +104,29 @@ public class LakeCommitResult {
// the readable log end offset for readable snapshot
Map<TableBucket, Long> readableLogEndOffsets,
@Nullable Long earliestSnapshotIDToKeep) {
+ return withReadableSnapshot(
+ committedSnapshotId,
+ readableSnapshotId,
+ tieredLogEndOffsets,
+ readableLogEndOffsets,
+ earliestSnapshotIDToKeep,
+ null);
+ }
+
+ public static LakeCommitResult withReadableSnapshot(
+ long committedSnapshotId,
+ long readableSnapshotId,
+ Map<TableBucket, Long> tieredLogEndOffsets,
+ Map<TableBucket, Long> readableLogEndOffsets,
+ @Nullable Long earliestSnapshotIDToKeep,
+ @Nullable TieringStats tieringStats) {
return new LakeCommitResult(
committedSnapshotId,
false,
new ReadableSnapshot(
readableSnapshotId, tieredLogEndOffsets,
readableLogEndOffsets),
- earliestSnapshotIDToKeep);
+ earliestSnapshotIDToKeep,
+ tieringStats);
}
public long getCommittedSnapshotId() {
@@ -110,6 +142,16 @@ public class LakeCommitResult {
return readableSnapshot;
}
+ /**
+ * Gets the tiering stats.
+ *
+ * @return the tiering stats
+ */
+ @Nullable
+ public TieringStats getTieringStats() {
+ return tieringStats;
+ }
+
/**
* Gets the earliest snapshot ID to keep.
*
diff --git
a/fluss-common/src/main/java/org/apache/fluss/lake/committer/TieringStats.java
b/fluss-common/src/main/java/org/apache/fluss/lake/committer/TieringStats.java
new file mode 100644
index 000000000..32f0b11d9
--- /dev/null
+++
b/fluss-common/src/main/java/org/apache/fluss/lake/committer/TieringStats.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.committer;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+
+/**
+ * Immutable statistics for a single completed tiering round of a lake table.
+ *
+ * <p>Fields use {@code null} to represent "unknown / not supported".
+ */
+public final class TieringStats implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * A {@code TieringStats} instance where every field is {@code null}
(unknown/unsupported). Use
+ * this as the default when no stats are available.
+ */
+ public static final TieringStats UNKNOWN = new TieringStats(null, null);
+
+ //
-----------------------------------------------------------------------------------------
+ // Lake data stats (reported by the lake committer)
+ //
-----------------------------------------------------------------------------------------
+
+ /** Cumulative total file size (bytes) of the lake table after this
tiering round. */
+ @Nullable private final Long fileSize;
+
+ /** Cumulative total record count of the lake table after this tiering
round. */
+ @Nullable private final Long recordCount;
+
+ public TieringStats(@Nullable Long fileSize, @Nullable Long recordCount) {
+ this.fileSize = fileSize;
+ this.recordCount = recordCount;
+ }
+
+ @Nullable
+ public Long getFileSize() {
+ return fileSize;
+ }
+
+ @Nullable
+ public Long getRecordCount() {
+ return recordCount;
+ }
+
+ /**
+ * Returns {@code true} when at least one stat field is non-{@code null},
meaning actual data
+ * was written during this tiering round.
+ */
+ public boolean isAvailableStats() {
+ return fileSize != null || recordCount != null;
+ }
+
+ @Override
+ public String toString() {
+ return "TieringStats{" + "fileSize=" + fileSize + ", recordCount=" +
recordCount + '}';
+ }
+}
diff --git
a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
index 1b1445665..b11c3b92f 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
@@ -63,6 +63,13 @@ public class MetricNames {
public static final String LAKE_TIERING_PENDING_TABLES_COUNT =
"pendingTablesCount";
public static final String LAKE_TIERING_RUNNING_TABLES_COUNT =
"runningTablesCount";
+ // for lake tiering table-level metrics
+ public static final String LAKE_TIERING_TABLE_TIER_LAG = "tierLag";
+ public static final String LAKE_TIERING_TABLE_TIER_DURATION =
"tierDuration";
+ public static final String LAKE_TIERING_TABLE_FAILURES_TOTAL =
"failuresTotal";
+ public static final String LAKE_TIERING_TABLE_FILE_SIZE = "fileSize";
+ public static final String LAKE_TIERING_TABLE_RECORD_COUNT = "recordCount";
+
//
--------------------------------------------------------------------------------------------
// metrics for tablet server
//
--------------------------------------------------------------------------------------------
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
index 597becb75..2f3a69a7e 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
@@ -30,6 +30,7 @@ import org.apache.fluss.flink.tiering.source.TieringSource;
import org.apache.fluss.lake.committer.CommittedLakeSnapshot;
import org.apache.fluss.lake.committer.LakeCommitResult;
import org.apache.fluss.lake.committer.LakeCommitter;
+import org.apache.fluss.lake.committer.TieringStats;
import org.apache.fluss.lake.writer.LakeTieringFactory;
import org.apache.fluss.lake.writer.LakeWriter;
import org.apache.fluss.metadata.TableBucket;
@@ -94,6 +95,22 @@ public class TieringCommitOperator<WriteResult, Committable>
private final Map<Long, List<TableBucketWriteResult<WriteResult>>>
collectedTableBucketWriteResults;
+ /**
+ * The result of one table's commit round, holding the lake committable
(nullable for empty
+ * commits where no data was written) and the associated tiering
statistics.
+ */
+ private final class CommitResult {
+ /** The lake committable, or {@code null} if nothing was written in
this round. */
+ @Nullable final Committable committable;
+ /** Per-table tiering statistics collected during this round. */
+ @Nullable final TieringStats stats;
+
+ CommitResult(@Nullable Committable committable, @Nullable TieringStats
stats) {
+ this.committable = committable;
+ this.stats = stats;
+ }
+ }
+
public TieringCommitOperator(
StreamOperatorParameters<CommittableMessage<Committable>>
parameters,
Configuration flussConf,
@@ -135,18 +152,20 @@ public class TieringCommitOperator<WriteResult,
Committable>
if (committableWriteResults != null) {
try {
- Committable committable =
+ CommitResult commitResult =
commitWriteResults(
tableId,
tableBucketWriteResult.tablePath(),
committableWriteResults);
- // only emit when committable is not-null
- if (committable != null) {
- output.collect(new StreamRecord<>(new
CommittableMessage<>(committable)));
+ // only emit downstream when actual data was written
+ if (commitResult.committable != null) {
+ output.collect(
+ new StreamRecord<>(new
CommittableMessage<>(commitResult.committable)));
}
// notify that the table id has been finished tier
operatorEventGateway.sendEventToCoordinator(
- new SourceEventWrapper(new
FinishedTieringEvent(tableId)));
+ new SourceEventWrapper(
+ new FinishedTieringEvent(tableId,
commitResult.stats)));
} catch (Exception e) {
// if any exception happens, send to source coordinator to
mark it as failed
operatorEventGateway.sendEventToCoordinator(
@@ -162,28 +181,31 @@ public class TieringCommitOperator<WriteResult,
Committable>
}
}
- @Nullable
- private Committable commitWriteResults(
+ /**
+ * Commits the collected write results for one table to the lake and Fluss.
+ *
+ * <p>Always returns a non-null {@link CommitResult}. When all buckets
produced no data (empty
+ * commit), {@link CommitResult#committable} is {@code null} and stats are
{@link
+ * TieringStats#UNKNOWN}.
+ */
+ private CommitResult commitWriteResults(
long tableId,
TablePath tablePath,
List<TableBucketWriteResult<WriteResult>> committableWriteResults)
throws Exception {
- // filter out non-null write result
- committableWriteResults =
+ // filter down to buckets that actually produced data
+ List<TableBucketWriteResult<WriteResult>> nonEmptyResults =
committableWriteResults.stream()
- .filter(
- writeResultTableBucketWriteResult ->
-
writeResultTableBucketWriteResult.writeResult() != null)
+ .filter(r -> r.writeResult() != null)
.collect(Collectors.toList());
- // empty, means all write result is null, which is a empty commit,
- // return null to skip the empty commit
- if (committableWriteResults.isEmpty()) {
+ // all buckets were empty — nothing to commit to the lake
+ if (nonEmptyResults.isEmpty()) {
LOG.info(
"Commit tiering write results is empty for table {}, table
path {}",
tableId,
tablePath);
- return null;
+ return new CommitResult(null, null);
}
// Check if the table was dropped and recreated during tiering.
@@ -202,18 +224,15 @@ public class TieringCommitOperator<WriteResult,
Committable>
try (LakeCommitter<WriteResult, Committable> lakeCommitter =
lakeTieringFactory.createLakeCommitter(
new TieringCommitterInitContext(
- tablePath,
- admin.getTableInfo(tablePath).get(),
- lakeTieringConfig,
- flussConfig))) {
+ tablePath, currentTableInfo,
lakeTieringConfig, flussConfig))) {
List<WriteResult> writeResults =
- committableWriteResults.stream()
+ nonEmptyResults.stream()
.map(TableBucketWriteResult::writeResult)
.collect(Collectors.toList());
Map<TableBucket, Long> logEndOffsets = new HashMap<>();
Map<TableBucket, Long> logMaxTieredTimestamps = new HashMap<>();
- for (TableBucketWriteResult<WriteResult> writeResult :
committableWriteResults) {
+ for (TableBucketWriteResult<WriteResult> writeResult :
nonEmptyResults) {
TableBucket tableBucket = writeResult.tableBucket();
logEndOffsets.put(tableBucket, writeResult.logEndOffset());
logMaxTieredTimestamps.put(tableBucket,
writeResult.maxTimestamp());
@@ -251,7 +270,7 @@ public class TieringCommitOperator<WriteResult, Committable>
lakeBucketTieredOffsetsFile,
logEndOffsets,
logMaxTieredTimestamps);
- return committable;
+ return new CommitResult(committable,
lakeCommitResult.getTieringStats());
}
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/FinishedTieringEvent.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/FinishedTieringEvent.java
index 0f10dc699..a1b409322 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/FinishedTieringEvent.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/FinishedTieringEvent.java
@@ -17,8 +17,12 @@
package org.apache.fluss.flink.tiering.event;
+import org.apache.fluss.lake.committer.TieringStats;
+
import org.apache.flink.api.connector.source.SourceEvent;
+import javax.annotation.Nullable;
+
/** SourceEvent used to represent a Fluss table has been tiered finished. */
public class FinishedTieringEvent implements SourceEvent {
@@ -26,11 +30,24 @@ public class FinishedTieringEvent implements SourceEvent {
private final long tableId;
- public FinishedTieringEvent(long tableId) {
+ /** Statistics collected during this tiering round. */
+ @Nullable private final TieringStats stats;
+
+ public FinishedTieringEvent(long tableId, @Nullable TieringStats stats) {
this.tableId = tableId;
+ this.stats = stats;
+ }
+
+ public FinishedTieringEvent(long tableId) {
+ this(tableId, null);
}
public long getTableId() {
return tableId;
}
+
+ @Nullable
+ public TieringStats getStats() {
+ return stats;
+ }
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
index 33615039b..337222f4e 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
@@ -30,6 +30,7 @@ import
org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent;
import org.apache.fluss.flink.tiering.source.split.TieringSplit;
import org.apache.fluss.flink.tiering.source.split.TieringSplitGenerator;
import
org.apache.fluss.flink.tiering.source.state.TieringSourceEnumeratorState;
+import org.apache.fluss.lake.committer.TieringStats;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.rpc.GatewayClientProxy;
@@ -38,6 +39,7 @@ import org.apache.fluss.rpc.gateway.CoordinatorGateway;
import org.apache.fluss.rpc.messages.LakeTieringHeartbeatRequest;
import org.apache.fluss.rpc.messages.LakeTieringHeartbeatResponse;
import org.apache.fluss.rpc.messages.PbHeartbeatReqForTable;
+import org.apache.fluss.rpc.messages.PbLakeTieringStats;
import org.apache.fluss.rpc.messages.PbLakeTieringTableInfo;
import org.apache.fluss.rpc.metrics.ClientMetricGroup;
import org.apache.fluss.utils.MapUtils;
@@ -272,10 +274,10 @@ public class TieringSourceEnumerator
finishedTableId);
} else {
boolean isForceFinished =
tieringReachMaxDurationsTables.remove(finishedTableId);
- LOG.info("Before finishedTables table {}.", finishedTables);
finishedTables.put(
- finishedTableId, TieringFinishInfo.from(tieringEpoch,
isForceFinished));
- LOG.info("After finishedTables table {}.", finishedTables);
+ finishedTableId,
+ TieringFinishInfo.from(
+ tieringEpoch, isForceFinished,
finishedTieringEvent.getStats()));
}
}
@@ -578,17 +580,32 @@ public class TieringSourceEnumerator
toPbHeartbeatReqForTable(tieringTableEpochs,
coordinatorEpoch));
}
if (!finishedTables.isEmpty()) {
- Map<Long, Long> finishTieringEpochs = new HashMap<>();
Set<Long> forceFinishedTables = new HashSet<>();
+ List<PbHeartbeatReqForTable> finishedTableReqs = new
ArrayList<>();
finishedTables.forEach(
(tableId, tieringFinishInfo) -> {
- finishTieringEpochs.put(tableId,
tieringFinishInfo.tieringEpoch);
if (tieringFinishInfo.isForceFinished) {
forceFinishedTables.add(tableId);
}
+ PbHeartbeatReqForTable pbHeartbeatReqForTable =
+ new PbHeartbeatReqForTable()
+ .setTableId(tableId)
+
.setCoordinatorEpoch(coordinatorEpoch)
+
.setTieringEpoch(tieringFinishInfo.tieringEpoch);
+ TieringStats stats = tieringFinishInfo.stats;
+ if (stats.isAvailableStats()) {
+ PbLakeTieringStats pbLakeTieringStats = new
PbLakeTieringStats();
+ if (stats.getFileSize() != null) {
+
pbLakeTieringStats.setFileSize(stats.getFileSize());
+ }
+ if (stats.getRecordCount() != null) {
+
pbLakeTieringStats.setRecordCount(stats.getRecordCount());
+ }
+
pbHeartbeatReqForTable.setLakeTieringStats(pbLakeTieringStats);
+ }
+ finishedTableReqs.add(pbHeartbeatReqForTable);
});
- heartbeatRequest.addAllFinishedTables(
- toPbHeartbeatReqForTable(finishTieringEpochs,
coordinatorEpoch));
+ heartbeatRequest.addAllFinishedTables(finishedTableReqs);
for (long forceFinishedTableId : forceFinishedTables) {
heartbeatRequest.addForceFinishedTable(forceFinishedTableId);
}
@@ -638,26 +655,32 @@ public class TieringSourceEnumerator
private static class TieringFinishInfo {
/** The epoch of the tiering operation for this table. */
- long tieringEpoch;
+ final long tieringEpoch;
/**
* Whether this table was force finished due to reaching the maximum
tiering duration. When
* a table's tiering operation exceeds the max duration (data lake
freshness), it will be
* force finished to prevent it from blocking other tables' tiering
operations.
*/
- boolean isForceFinished;
+ final boolean isForceFinished;
+
+ /** Stats collected during this tiering round. */
+ final TieringStats stats;
public static TieringFinishInfo from(long tieringEpoch) {
- return new TieringFinishInfo(tieringEpoch, false);
+ return new TieringFinishInfo(tieringEpoch, false, null);
}
- public static TieringFinishInfo from(long tieringEpoch, boolean
isForceFinished) {
- return new TieringFinishInfo(tieringEpoch, isForceFinished);
+ public static TieringFinishInfo from(
+ long tieringEpoch, boolean isForceFinished, @Nullable
TieringStats stats) {
+ return new TieringFinishInfo(tieringEpoch, isForceFinished, stats);
}
- private TieringFinishInfo(long tieringEpoch, boolean isForceFinished) {
+ private TieringFinishInfo(
+ long tieringEpoch, boolean isForceFinished, @Nullable
TieringStats stats) {
this.tieringEpoch = tieringEpoch;
this.isForceFinished = isForceFinished;
+ this.stats = stats != null ? stats : TieringStats.UNKNOWN;
}
}
}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
index e96a8be33..06b572760 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
@@ -141,6 +141,7 @@ public class IcebergLakeCommitter implements
LakeCommitter<IcebergWriteResult, I
snapshotId = rewriteCommitSnapshotId;
}
}
+ // Iceberg does not provide cumulative table stats API yet; leave
stats as -1 (unknown).
return LakeCommitResult.committedIsReadable(snapshotId);
} catch (Exception e) {
throw new IOException("Failed to commit to Iceberg table.", e);
diff --git
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeCommitter.java
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeCommitter.java
index 47c163cdb..4fb7b96f0 100644
---
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeCommitter.java
+++
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeCommitter.java
@@ -78,6 +78,7 @@ public class LanceLakeCommitter implements
LakeCommitter<LanceWriteResult, Lance
properties.put(committerName, FLUSS_LAKE_TIERING_COMMIT_USER);
long snapshotId =
LanceDatasetAdapter.commitAppend(config,
committable.committable(), properties);
+ // Lance does not provide cumulative table stats API yet; leave stats
as -1 (unknown).
return LakeCommitResult.committedIsReadable(snapshotId);
}
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
index 03ced0db2..1018fe78d 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
@@ -23,17 +23,20 @@ import
org.apache.fluss.lake.committer.CommittedLakeSnapshot;
import org.apache.fluss.lake.committer.CommitterInitContext;
import org.apache.fluss.lake.committer.LakeCommitResult;
import org.apache.fluss.lake.committer.LakeCommitter;
+import org.apache.fluss.lake.committer.TieringStats;
import org.apache.fluss.lake.paimon.utils.DvTableReadableSnapshotRetriever;
import org.apache.fluss.metadata.TablePath;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.SimpleFileEntry;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.TableSnapshot;
import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.utils.SnapshotManager;
@@ -46,6 +49,7 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import static
org.apache.fluss.lake.paimon.tiering.PaimonLakeTieringFactory.FLUSS_LAKE_TIERING_COMMIT_USER;
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
@@ -112,9 +116,12 @@ public class PaimonLakeCommitter implements
LakeCommitter<PaimonWriteResult, Pai
"Paimon committed snapshot id must be non-null.");
currentCommitSnapshotId.remove();
+ // Collect cumulative table stats from the exact snapshot that was
just committed.
+ TieringStats stats = computeTableStats();
+
// deletion vector is disabled, committed snapshot is readable
if (!fileStoreTable.coreOptions().deletionVectorsEnabled()) {
- return
LakeCommitResult.committedIsReadable(committedSnapshotId);
+ return
LakeCommitResult.committedIsReadable(committedSnapshotId, stats);
} else {
// retrieve the readable snapshot during commit
try (DvTableReadableSnapshotRetriever retriever =
@@ -123,7 +130,7 @@ public class PaimonLakeCommitter implements
LakeCommitter<PaimonWriteResult, Pai
DvTableReadableSnapshotRetriever.ReadableSnapshotResult
readableSnapshotResult =
retriever.getReadableSnapshotAndOffsets(committedSnapshotId);
if (readableSnapshotResult == null) {
- return
LakeCommitResult.unknownReadableSnapshot(committedSnapshotId);
+ return
LakeCommitResult.unknownReadableSnapshot(committedSnapshotId, stats);
} else {
long earliestSnapshotIdToKeep =
readableSnapshotResult.getEarliestSnapshotIdToKeep();
@@ -139,7 +146,8 @@ public class PaimonLakeCommitter implements
LakeCommitter<PaimonWriteResult, Pai
readableSnapshotResult.getReadableSnapshotId(),
readableSnapshotResult.getTieredOffsets(),
readableSnapshotResult.getReadableOffsets(),
- earliestSnapshotIdToKeep);
+ earliestSnapshotIdToKeep,
+ stats);
}
}
}
@@ -153,6 +161,32 @@ public class PaimonLakeCommitter implements
LakeCommitter<PaimonWriteResult, Pai
}
}
+ /** Computes cumulative table stats from the latest snapshot by REST API.
*/
+ @Nullable
+ private TieringStats computeTableStats() {
+ Identifier identifier =
+ new Identifier(tablePath.getDatabaseName(),
tablePath.getTableName());
+ try {
+ Optional<TableSnapshot> snapshot =
paimonCatalog.loadSnapshot(identifier);
+ if (!snapshot.isPresent()) {
+ LOG.warn(
+ "No snapshot found for table {}, "
+ + "fileSize and recordCount will be reported
as -1.",
+ tablePath);
+ return null;
+ }
+ TableSnapshot tableSnapshot = snapshot.get();
+ return new TieringStats(tableSnapshot.fileSizeInBytes(),
tableSnapshot.recordCount());
+ } catch (Exception e) {
+ LOG.debug(
+ "Failed to load snapshot for table {}, "
+ + "fileSize and recordCount will be reported as
-1.",
+ tablePath,
+ e);
+ return null;
+ }
+ }
+
@Override
public void abort(PaimonCommittable committable) throws IOException {
tableCommit = fileStoreTable.newCommit(FLUSS_LAKE_TIERING_COMMIT_USER);
diff --git a/fluss-rpc/src/main/proto/FlussApi.proto
b/fluss-rpc/src/main/proto/FlussApi.proto
index f04b30756..b14238861 100644
--- a/fluss-rpc/src/main/proto/FlussApi.proto
+++ b/fluss-rpc/src/main/proto/FlussApi.proto
@@ -1063,6 +1063,16 @@ message PbHeartbeatReqForTable {
required int32 coordinator_epoch = 2;
// the tiering epoch when the table is assigned to be tiering service
required int64 tiering_epoch = 3;
+ // stats for the last tiering round; only set for finished_tables in
LakeTieringHeartbeatRequest
+ optional PbLakeTieringStats lake_tiering_stats = 4;
+}
+
+// Stats reported by the tiering service when a table finishes a tiering round.
+message PbLakeTieringStats {
+ // cumulative total file size (bytes) of the lake table after tiering
+ optional int64 file_size = 1;
+ // cumulative total record count of the lake table after tiering
+ optional int64 record_count = 2;
}
message PbHeartbeatRespForTable {
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
index 388f1816f..17b6d934c 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
@@ -41,6 +41,7 @@ import org.apache.fluss.exception.UnknownServerException;
import org.apache.fluss.exception.UnknownTableOrBucketException;
import org.apache.fluss.fs.FileSystem;
import org.apache.fluss.fs.FsPath;
+import org.apache.fluss.lake.committer.TieringStats;
import org.apache.fluss.lake.lakestorage.LakeCatalog;
import org.apache.fluss.metadata.DataLakeFormat;
import org.apache.fluss.metadata.DatabaseDescriptor;
@@ -106,6 +107,7 @@ import org.apache.fluss.rpc.messages.PbAlterConfig;
import org.apache.fluss.rpc.messages.PbHeartbeatReqForTable;
import org.apache.fluss.rpc.messages.PbHeartbeatRespForTable;
import org.apache.fluss.rpc.messages.PbKvSnapshotLeaseForTable;
+import org.apache.fluss.rpc.messages.PbLakeTieringStats;
import org.apache.fluss.rpc.messages.PbPrepareLakeTableRespForTable;
import org.apache.fluss.rpc.messages.PbProducerTableOffsets;
import org.apache.fluss.rpc.messages.PbTableBucket;
@@ -853,10 +855,19 @@ public final class CoordinatorService extends
RpcServiceBase implements Coordina
heartbeatResponse.addFinishedTableResp().setTableId(finishTable.getTableId());
try {
validateHeartbeatRequest(finishTable, currentCoordinatorEpoch);
+ TieringStats stats = TieringStats.UNKNOWN;
+ if (finishTable.hasLakeTieringStats()) {
+ PbLakeTieringStats pbStats =
finishTable.getLakeTieringStats();
+ stats =
+ new TieringStats(
+ pbStats.hasFileSize() ?
pbStats.getFileSize() : null,
+ pbStats.hasRecordCount() ?
pbStats.getRecordCount() : null);
+ }
lakeTableTieringManager.finishTableTiering(
finishTable.getTableId(),
finishTable.getTieringEpoch(),
-
forceFinishedTableId.contains(finishTable.getTableId()));
+
forceFinishedTableId.contains(finishTable.getTableId()),
+ stats);
} catch (Throwable e) {
pbHeartbeatRespForTable.setError(ApiError.fromThrowable(e).toErrorResponse());
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java
index 6d412f5b7..a3448d148 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java
@@ -21,9 +21,12 @@ import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.exception.FencedTieringEpochException;
import org.apache.fluss.exception.FlussRuntimeException;
import org.apache.fluss.exception.TableNotExistException;
+import org.apache.fluss.lake.committer.TieringStats;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.metrics.Counter;
import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.metrics.groups.MetricGroup;
import org.apache.fluss.server.entity.LakeTieringTableInfo;
import org.apache.fluss.server.metrics.group.LakeTieringMetricGroup;
import org.apache.fluss.server.utils.timer.DefaultTimer;
@@ -54,6 +57,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.ToLongFunction;
import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
@@ -128,8 +132,14 @@ public class LakeTableTieringManager implements
AutoCloseable {
// cache table_id -> table tiering epoch
private final Map<Long, Long> tableTierEpoch;
- // table_id -> the last timestamp of tiered lake snapshot
- private final Map<Long, Long> tableLastTieredTime;
+ // table_id -> result snapshot of the last completed tiering round
+ private final Map<Long, LastTieringResult> lastTieringResult;
+
+ // table_id -> the tiering failure counter
+ private final Map<Long, Counter> tableFailureCounters;
+
+ // table_id -> start time (ms) of the currently in-progress tiering round
+ private final Map<Long, Long> currentTieringStartTime;
// the live tables that are tiering,
// from table_id -> last heartbeat time by the tiering service
@@ -170,8 +180,10 @@ public class LakeTableTieringManager implements
AutoCloseable {
this.lakeTieringServiceTimeoutChecker.scheduleWithFixedDelay(
this::checkTieringServiceTimeout, 0, 15, TimeUnit.SECONDS);
this.tableTierEpoch = new HashMap<>();
- this.tableLastTieredTime = new HashMap<>();
+ this.lastTieringResult = new HashMap<>();
this.delayedTieringByTableId = new HashMap<>();
+ this.tableFailureCounters = new HashMap<>();
+ this.currentTieringStartTime = new HashMap<>();
this.tieringMetricGroup = lakeTieringMetricGroup;
registerMetrics();
}
@@ -218,14 +230,18 @@ public class LakeTableTieringManager implements
AutoCloseable {
tablePaths.put(tableId, tableInfo.getTablePath());
tableLakeFreshness.put(
tableId,
tableInfo.getTableConfig().getDataLakeFreshness().toMillis());
- tableLastTieredTime.put(tableId, lastTieredTime);
+ lastTieringResult.put(tableId,
LastTieringResult.initial(lastTieredTime));
tableTierEpoch.put(tableId, 0L);
+
+ // register table-level metrics
+ registerTableMetrics(tableId, tableInfo.getTablePath());
}
+ @GuardedBy("lock")
private void scheduleTableTiering(long tableId) {
Long freshnessInterval = tableLakeFreshness.get(tableId);
- Long lastTieredTime = tableLastTieredTime.get(tableId);
- if (freshnessInterval == null || lastTieredTime == null) {
+ LastTieringResult lastResult = lastTieringResult.get(tableId);
+ if (freshnessInterval == null || lastResult == null) {
// the table has been dropped, return directly
return;
}
@@ -234,7 +250,7 @@ public class LakeTableTieringManager implements
AutoCloseable {
if (existingDelayedTiering != null) {
existingDelayedTiering.cancel();
}
- long delayMs = freshnessInterval - (clock.milliseconds() -
lastTieredTime);
+ long delayMs = freshnessInterval - (clock.milliseconds() -
lastResult.tieredTime);
// if the delayMs is < 0, the DelayedTiering will be triggered at once
without
// adding into timing wheel.
DelayedTiering delayedTiering = new DelayedTiering(tableId, delayMs);
@@ -242,13 +258,67 @@ public class LakeTableTieringManager implements
AutoCloseable {
lakeTieringScheduleTimer.add(delayedTiering);
}
+ private void registerTableMetrics(long tableId, TablePath tablePath) {
+ // create table-level metric group
+ MetricGroup tableMetricGroup =
+ tieringMetricGroup.addTableLakeTieringMetricGroup(tableId,
tablePath);
+
+ // tierLag: milliseconds since last successful tiering
+ tableMetricGroup.gauge(
+ MetricNames.LAKE_TIERING_TABLE_TIER_LAG,
+ () ->
+ inReadLock(
+ lock,
+ () -> {
+ LastTieringResult r =
lastTieringResult.get(tableId);
+ return r != null ? clock.milliseconds() -
r.tieredTime : -1L;
+ }));
+
+ // tierDuration: duration of last tiering job
+ tableMetricGroup.gauge(
+ MetricNames.LAKE_TIERING_TABLE_TIER_DURATION,
+ () -> inReadLock(lock, () -> getLastResultField(tableId, r ->
r.tierDuration)));
+
+ // failuresTotal: total failure count for this table
+ Counter failuresCounter =
+
tableMetricGroup.counter(MetricNames.LAKE_TIERING_TABLE_FAILURES_TOTAL);
+ tableFailureCounters.put(tableId, failuresCounter);
+
+ // fileSize: cumulative total file size of the lake table after the
last tiering
+ tableMetricGroup.gauge(
+ MetricNames.LAKE_TIERING_TABLE_FILE_SIZE,
+ () -> inReadLock(lock, () -> getLastResultField(tableId, r ->
r.fileSize)));
+
+ // recordCount: cumulative total record count of the lake table after
the last tiering
+ tableMetricGroup.gauge(
+ MetricNames.LAKE_TIERING_TABLE_RECORD_COUNT,
+ () -> inReadLock(lock, () -> getLastResultField(tableId, r ->
r.recordCount)));
+ }
+
+ /**
+ * Returns the value of a single field from the {@link LastTieringResult}
for the given table,
+ * or {@code -1} if the table has no completed tiering result (e.g. not
yet tiered or already
+ * removed).
+ *
+ * <p>Must be called under {@link #lock} (read or write).
+ */
+ @GuardedBy("lock")
+ long getLastResultField(long tableId, ToLongFunction<LastTieringResult>
fieldExtractor) {
+ LastTieringResult r = lastTieringResult.get(tableId);
+ return r != null ? fieldExtractor.applyAsLong(r) : -1L;
+ }
+
public void removeLakeTable(long tableId) {
inWriteLock(
lock,
() -> {
tablePaths.remove(tableId);
tableLakeFreshness.remove(tableId);
- tableLastTieredTime.remove(tableId);
+ lastTieringResult.remove(tableId);
+ currentTieringStartTime.remove(tableId);
+ tableFailureCounters.remove(tableId);
+ // close and remove the metric group to unregister metrics
+
tieringMetricGroup.removeTableLakeTieringMetricGroup(tableId);
tieringStates.remove(tableId);
liveTieringTableIds.remove(tableId);
tableTierEpoch.remove(tableId);
@@ -350,11 +420,13 @@ public class LakeTableTieringManager implements
AutoCloseable {
});
}
- public void finishTableTiering(long tableId, long tieredEpoch, boolean
isForceFinished) {
+ public void finishTableTiering(
+ long tableId, long tieredEpoch, boolean isForceFinished,
TieringStats stats) {
inWriteLock(
lock,
() -> {
validateTieringServiceRequest(tableId, tieredEpoch);
+ updateTableTieringResult(tableId, stats);
// to tiered state firstly
doHandleStateChange(tableId, TieringState.Tiered);
if (isForceFinished) {
@@ -367,6 +439,34 @@ public class LakeTableTieringManager implements
AutoCloseable {
});
}
+ @GuardedBy("lock")
+ private void updateTableTieringResult(long tableId, TieringStats stats) {
+ Long startTime = currentTieringStartTime.get(tableId);
+ long now = clock.milliseconds();
+ long duration = startTime != null ? now - startTime : -1L;
+ if (stats.isAvailableStats()) {
+ lastTieringResult.put(
+ tableId,
+ new LastTieringResult(
+ now,
+ duration,
+ stats.getFileSize() != null ? stats.getFileSize()
: -1L,
+ stats.getRecordCount() != null ?
stats.getRecordCount() : -1L));
+ } else {
+ lastTieringResult.compute(
+ tableId,
+ (k, prev) -> {
+ if (prev == null) {
+ throw new IllegalStateException(
+ "Last tiering result not found for table "
+ tableId);
+ } else {
+ return new LastTieringResult(
+ now, duration, prev.fileSize,
prev.recordCount);
+ }
+ });
+ }
+ }
+
public void reportTieringFail(long tableId, long tieringEpoch) {
inWriteLock(
lock,
@@ -482,13 +582,19 @@ public class LakeTableTieringManager implements
AutoCloseable {
break;
case Tiering:
liveTieringTableIds.put(tableId, clock.milliseconds());
+ currentTieringStartTime.put(tableId, clock.milliseconds());
break;
case Tiered:
- tableLastTieredTime.put(tableId, clock.milliseconds());
liveTieringTableIds.remove(tableId);
+ currentTieringStartTime.remove(tableId);
break;
case Failed:
+ Counter counter = tableFailureCounters.get(tableId);
+ if (counter != null) {
+ counter.inc();
+ }
liveTieringTableIds.remove(tableId);
+ currentTieringStartTime.remove(tableId);
// do nothing
break;
}
@@ -514,7 +620,7 @@ public class LakeTableTieringManager implements
AutoCloseable {
"Successfully changed tiering state for table {} from {} to
{}.",
tableId,
fromState,
- fromState);
+ toState);
}
@Override
@@ -636,13 +742,87 @@ public class LakeTableTieringManager implements
AutoCloseable {
abstract Set<TieringState> validPreviousStates();
}
+ /**
+ * Immutable snapshot of the statistics collected at the end of one
completed tiering round.
+ * Written atomically when a table transitions to {@link
TieringState#Tiered}.
+ */
@VisibleForTesting
- protected int getPendingTablesCount() {
+ static class LastTieringResult {
+
+ /** Timestamp (ms) when this tiering round completed. */
+ final long tieredTime;
+
+ /** Wall-clock duration (ms) of this tiering round; {@code -1} if
unknown. */
+ final long tierDuration;
+
+ /**
+ * Cumulative total file size (bytes) of the lake table after this
round; {@code -1} if not
+ * reported by the lake implementation (converted from {@code null} in
{@link
+ * TieringStats}).
+ */
+ final long fileSize;
+
+ /**
+ * Cumulative total record count of the lake table after this round;
{@code -1} if not
+ * reported by the lake implementation (converted from {@code null} in
{@link
+ * TieringStats}).
+ */
+ final long recordCount;
+
+ LastTieringResult(long tieredTime, long tierDuration, long fileSize,
long recordCount) {
+ this.tieredTime = tieredTime;
+ this.tierDuration = tierDuration;
+ this.fileSize = fileSize;
+ this.recordCount = recordCount;
+ }
+
+ /**
+ * Creates the initial placeholder used when a table is first
registered. Only {@code
+ * tieredTime} is meaningful; all other stats are {@code -1} until the
first round
+ * completes.
+ */
+ static LastTieringResult initial(long tieredTime) {
+ return new LastTieringResult(tieredTime, -1L, -1L, -1L);
+ }
+ }
+
+ @VisibleForTesting
+ int getPendingTablesCount() {
return inReadLock(lock, pendingTieringTables::size);
}
@VisibleForTesting
- protected int getRunningTablesCount() {
+ int getRunningTablesCount() {
return inReadLock(lock, liveTieringTableIds::size);
}
+
+ @VisibleForTesting
+ Long getTableLastSuccessTime(long tableId) {
+ return inReadLock(
+ lock,
+ () -> {
+ LastTieringResult r = lastTieringResult.get(tableId);
+ return r != null ? r.tieredTime : null;
+ });
+ }
+
+ @VisibleForTesting
+ Long getTableFailureCount(long tableId) {
+ return inReadLock(
+ lock,
+ () -> {
+ Counter c = tableFailureCounters.get(tableId);
+ return c != null ? c.getCount() : 0L;
+ });
+ }
+
+ @VisibleForTesting
+ TieringState getTableState(long tableId) {
+ return inReadLock(lock, () -> tieringStates.get(tableId));
+ }
+
+ @VisibleForTesting
+ long getLastTieringResultField(long tableId,
ToLongFunction<LastTieringResult> fieldExtractor) {
+ return inReadLock(lock, () -> getLastResultField(tableId,
fieldExtractor));
+ }
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/LakeTieringMetricGroup.java
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/LakeTieringMetricGroup.java
index bc59b11ff..85724cfca 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/LakeTieringMetricGroup.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/LakeTieringMetricGroup.java
@@ -18,10 +18,15 @@
package org.apache.fluss.server.metrics.group;
+import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.metrics.CharacterFilter;
import org.apache.fluss.metrics.groups.AbstractMetricGroup;
+import org.apache.fluss.metrics.groups.MetricGroup;
import org.apache.fluss.metrics.registry.MetricRegistry;
+import java.util.HashMap;
+import java.util.Map;
+
import static org.apache.fluss.metrics.utils.MetricGroupUtils.makeScope;
/** Metrics for lake tiering. */
@@ -29,6 +34,8 @@ public class LakeTieringMetricGroup extends
AbstractMetricGroup {
private static final String NAME = "lakeTiering";
+ private final Map<Long, TableMetricGroup> metricGroupByTable = new
HashMap<>();
+
public LakeTieringMetricGroup(MetricRegistry registry,
CoordinatorMetricGroup parent) {
super(registry, makeScope(parent, NAME), parent);
}
@@ -37,4 +44,53 @@ public class LakeTieringMetricGroup extends
AbstractMetricGroup {
protected String getGroupName(CharacterFilter filter) {
return NAME;
}
+
+ // ------------------------------------------------------------------------
+ // table lake tiering groups
+ // ------------------------------------------------------------------------
+ public MetricGroup addTableLakeTieringMetricGroup(long tableId, TablePath
tablePath) {
+ return metricGroupByTable.computeIfAbsent(
+ tableId, table -> new TableMetricGroup(registry, this,
tablePath, tableId));
+ }
+
+ public void removeTableLakeTieringMetricGroup(long tableId) {
+ // get the metric group of the table
+ TableMetricGroup tableMetricGroup = metricGroupByTable.get(tableId);
+ // if get the table metric group
+ if (tableMetricGroup != null) {
+ tableMetricGroup.close();
+ metricGroupByTable.remove(tableId);
+ }
+ }
+
+ /** The metric group for table. */
+ public static class TableMetricGroup extends AbstractMetricGroup {
+
+ private static final String NAME = "table";
+
+ private final TablePath tablePath;
+ private final long tableId;
+
+ public TableMetricGroup(
+ MetricRegistry registry,
+ LakeTieringMetricGroup parent,
+ TablePath tablePath,
+ long tableId) {
+ super(registry, makeScope(parent, NAME), parent);
+ this.tablePath = tablePath;
+ this.tableId = tableId;
+ }
+
+ @Override
+ protected void putVariables(Map<String, String> variables) {
+ variables.put("database", tablePath.getDatabaseName());
+ variables.put("table", tablePath.getTableName());
+ variables.put("tableId", String.valueOf(tableId));
+ }
+
+ @Override
+ protected String getGroupName(CharacterFilter filter) {
+ return NAME;
+ }
+ }
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableTieringManagerTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableTieringManagerTest.java
index f0ab38f78..d615f31b2 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableTieringManagerTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableTieringManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.fluss.server.coordinator;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.exception.FencedTieringEpochException;
import org.apache.fluss.exception.TableNotExistException;
+import org.apache.fluss.lake.committer.TieringStats;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TableInfo;
@@ -133,7 +134,10 @@ class LakeTableTieringManagerTest {
assertThatThrownBy(() ->
tableTieringManager.reportTieringFail(tableId1, 1))
.isInstanceOf(TableNotExistException.class)
.hasMessage("The table %d doesn't exist.", tableId1);
- assertThatThrownBy(() ->
tableTieringManager.finishTableTiering(tableId1, 1, false))
+ assertThatThrownBy(
+ () ->
+ tableTieringManager.finishTableTiering(
+ tableId1, 1, false,
TieringStats.UNKNOWN))
.isInstanceOf(TableNotExistException.class)
.hasMessage("The table %d doesn't exist.", tableId1);
}
@@ -154,7 +158,7 @@ class LakeTableTieringManagerTest {
assertThat(tableTieringManager.requestTable()).isNull();
// mock lake tiering finish one-round tiering
- tableTieringManager.finishTableTiering(tableId1, tieredEpoch, false);
+ tableTieringManager.finishTableTiering(tableId1, tieredEpoch, false,
TieringStats.UNKNOWN);
// not advance time, request table should return null
assertThat(tableTieringManager.requestTable()).isNull();
@@ -213,12 +217,18 @@ class LakeTableTieringManagerTest {
.hasMessage(
"The tiering epoch %d is not match current epoch %d in
coordinator for table %d.",
1, 2, tableId1);
- assertThatThrownBy(() ->
tableTieringManager.finishTableTiering(tableId1, 1, false))
+ assertThatThrownBy(
+ () ->
+ tableTieringManager.finishTableTiering(
+ tableId1, 1, false,
TieringStats.UNKNOWN))
.isInstanceOf(FencedTieringEpochException.class)
.hasMessage(
"The tiering epoch %d is not match current epoch %d in
coordinator for table %d.",
1, 2, tableId1);
- assertThatThrownBy(() ->
tableTieringManager.finishTableTiering(tableId1, 3, false))
+ assertThatThrownBy(
+ () ->
+ tableTieringManager.finishTableTiering(
+ tableId1, 3, false,
TieringStats.UNKNOWN))
.isInstanceOf(FencedTieringEpochException.class)
.hasMessage(
"The tiering epoch %d is not match current epoch %d in
coordinator for table %d.",
@@ -278,6 +288,90 @@ class LakeTableTieringManagerTest {
assertThat(tableTieringManager.getPendingTablesCount()).isEqualTo(1);
// back to pending
}
+ @Test
+ void testTableLevelMetrics() {
+ long tableId1 = 1L;
+ TablePath tablePath1 = TablePath.of("db", "table1");
+ TableInfo tableInfo1 = createTableInfo(tableId1, tablePath1,
Duration.ofSeconds(10));
+ tableTieringManager.addNewLakeTable(tableInfo1);
+
+ // Initially, table is just created, lastSuccessTime is the current
time
+ Long initialTime =
tableTieringManager.getTableLastSuccessTime(tableId1);
+ assertThat(initialTime).isNotNull();
+ assertThat(initialTime).isEqualTo(manualClock.milliseconds());
+
+ // Advance time and request table
+ manualClock.advanceTime(Duration.ofSeconds(10));
+ assertRequestTable(tableId1, tablePath1, 1);
+
+ // State should be Tiering (4)
+ assertThat(tableTieringManager.getTableState(tableId1))
+ .isEqualTo(LakeTableTieringManager.TieringState.Tiering);
+
+ // Simulate tiering duration — finish with UNKNOWN stats (empty
commit).
+ // Duration should still be updated even when no stats/data are
reported.
+ manualClock.advanceTime(Duration.ofSeconds(5));
+ tableTieringManager.finishTableTiering(tableId1, 1, false,
TieringStats.UNKNOWN);
+ assertThat(tableTieringManager.getLastTieringResultField(tableId1, r
-> r.tierDuration))
+ .isEqualTo(5000L);
+
+ // lastSuccessTime should be just now
+ assertThat(tableTieringManager.getTableLastSuccessTime(tableId1))
+ .isEqualTo(manualClock.milliseconds());
+
+ // fileSize and recordCount should be null (mapped to -1) when not
reported
+ assertThat(tableTieringManager.getLastTieringResultField(tableId1, r
-> r.fileSize))
+ .isEqualTo(-1L);
+ assertThat(tableTieringManager.getLastTieringResultField(tableId1, r
-> r.recordCount))
+ .isEqualTo(-1L);
+
+ // State should be Scheduled (2) after finish
+ assertThat(tableTieringManager.getTableState(tableId1))
+ .isEqualTo(LakeTableTieringManager.TieringState.Scheduled);
+
+ // Advance time to make lastSuccessAge increase
+ manualClock.advanceTime(Duration.ofSeconds(3));
+ long lastSuccessAge =
+ manualClock.milliseconds() -
tableTieringManager.getTableLastSuccessTime(tableId1);
+ assertThat(lastSuccessAge).isEqualTo(3000L);
+
+ // Request again and finish with valid stats
+ manualClock.advanceTime(Duration.ofSeconds(7));
+ assertRequestTable(tableId1, tablePath1, 2);
+ manualClock.advanceTime(Duration.ofSeconds(4));
+ long expectedFileSize = 1024L * 1024L;
+ long expectedRecordCount = 500L;
+ tableTieringManager.finishTableTiering(
+ tableId1, 2, false, new TieringStats(expectedFileSize,
expectedRecordCount));
+
+ // fileSize and recordCount should reflect the reported stats
+ assertThat(tableTieringManager.getLastTieringResultField(tableId1, r
-> r.fileSize))
+ .isEqualTo(expectedFileSize);
+ assertThat(tableTieringManager.getLastTieringResultField(tableId1, r
-> r.recordCount))
+ .isEqualTo(expectedRecordCount);
+ // duration of the second round should be 4s
+ assertThat(tableTieringManager.getLastTieringResultField(tableId1, r
-> r.tierDuration))
+ .isEqualTo(4000L);
+
+ // Request again and report failure — stats should remain from last
successful tiering
+ manualClock.advanceTime(Duration.ofSeconds(10));
+ assertRequestTable(tableId1, tablePath1, 3);
+ tableTieringManager.reportTieringFail(tableId1, 3);
+
+ // Failures should increment
+
assertThat(tableTieringManager.getTableFailureCount(tableId1)).isEqualTo(1L);
+
+ // fileSize and recordCount should remain unchanged after failure
+ assertThat(tableTieringManager.getLastTieringResultField(tableId1, r
-> r.fileSize))
+ .isEqualTo(expectedFileSize);
+ assertThat(tableTieringManager.getLastTieringResultField(tableId1, r
-> r.recordCount))
+ .isEqualTo(expectedRecordCount);
+
+ // State should be Pending (3) after failure
+ assertThat(tableTieringManager.getTableState(tableId1))
+ .isEqualTo(LakeTableTieringManager.TieringState.Pending);
+ }
+
@Test
void testForceFinishTableTieringImmediatelyRePending() {
long tableId1 = 1L;
@@ -292,7 +386,7 @@ class LakeTableTieringManagerTest {
assertThat(tableTieringManager.requestTable()).isNull();
// mock lake tiering force finish (e.g., due to exceeding tiering
duration)
- tableTieringManager.finishTableTiering(tableId1, 1, true);
+ tableTieringManager.finishTableTiering(tableId1, 1, true,
TieringStats.UNKNOWN);
// should immediately be re-pending and can be requested again without
waiting
assertRequestTable(tableId1, tablePath1, 2);
diff --git a/website/docs/maintenance/observability/monitor-metrics.md
b/website/docs/maintenance/observability/monitor-metrics.md
index 8bca85f66..d079c416e 100644
--- a/website/docs/maintenance/observability/monitor-metrics.md
+++ b/website/docs/maintenance/observability/monitor-metrics.md
@@ -294,7 +294,7 @@ Some metrics might not be exposed when using other JVM
implementations (e.g. IBM
</thead>
<tbody>
<tr>
- <th rowspan="19"><strong>coordinator</strong></th>
+ <th rowspan="24"><strong>coordinator</strong></th>
<td style={{textAlign: 'center', verticalAlign: 'middle' }}
rowspan="10">-</td>
<td>activeCoordinatorCount</td>
<td>The number of active CoordinatorServer in this cluster.</td>
@@ -395,6 +395,32 @@ Some metrics might not be exposed when using other JVM
implementations (e.g. IBM
<td>The number of tables currently being tiered.</td>
<td>Gauge</td>
</tr>
+ <tr>
+ <td rowspan="5">lakeTiering_table</td>
+ <td>tierLag</td>
+ <td>Time in milliseconds since the last successful tiering operation for
this table. For newly registered tables that have never completed a tiering
round, the lag is measured from the time the table was registered.</td>
+ <td>Gauge</td>
+ </tr>
+ <tr>
+ <td>tierDuration</td>
+ <td>Duration in milliseconds of the last tiering operation for this
table.</td>
+ <td>Gauge</td>
+ </tr>
+ <tr>
+ <td>failuresTotal</td>
+ <td>The total number of tiering failures for this table.</td>
+ <td>Counter</td>
+ </tr>
+ <tr>
+ <td>fileSize</td>
+ <td>Cumulative total file size (in bytes) of the lake table after the
last tiering round. Returns -1 if no tiering has completed yet.</td>
+ <td>Gauge</td>
+ </tr>
+ <tr>
+ <td>recordCount</td>
+ <td>Cumulative total record count of the lake table after the last
tiering round. Returns -1 if no tiering has completed yet.</td>
+ <td>Gauge</td>
+ </tr>
</tbody>
</table>