This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new f7fa97169 [lake/tiering] Add per-table monitoring metrics for Lake 
Tiering (#2454)
f7fa97169 is described below

commit f7fa971694cf9fe81c9ede5f74302952e4dcd70b
Author: Junbo Wang <[email protected]>
AuthorDate: Fri Mar 13 20:36:49 2026 +0800

    [lake/tiering] Add per-table monitoring metrics for Lake Tiering (#2454)
---
 .../fluss/lake/committer/LakeCommitResult.java     |  50 ++++-
 .../apache/fluss/lake/committer/TieringStats.java  |  77 ++++++++
 .../java/org/apache/fluss/metrics/MetricNames.java |   7 +
 .../tiering/committer/TieringCommitOperator.java   |  65 ++++---
 .../flink/tiering/event/FinishedTieringEvent.java  |  19 +-
 .../source/enumerator/TieringSourceEnumerator.java |  49 +++--
 .../lake/iceberg/tiering/IcebergLakeCommitter.java |   1 +
 .../lake/lance/tiering/LanceLakeCommitter.java     |   1 +
 .../lake/paimon/tiering/PaimonLakeCommitter.java   |  40 +++-
 fluss-rpc/src/main/proto/FlussApi.proto            |  10 +
 .../server/coordinator/CoordinatorService.java     |  13 +-
 .../coordinator/LakeTableTieringManager.java       | 206 +++++++++++++++++++--
 .../metrics/group/LakeTieringMetricGroup.java      |  56 ++++++
 .../coordinator/LakeTableTieringManagerTest.java   | 104 ++++++++++-
 .../maintenance/observability/monitor-metrics.md   |  28 ++-
 15 files changed, 662 insertions(+), 64 deletions(-)

diff --git 
a/fluss-common/src/main/java/org/apache/fluss/lake/committer/LakeCommitResult.java
 
b/fluss-common/src/main/java/org/apache/fluss/lake/committer/LakeCommitResult.java
index 88005f62f..737079d52 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/lake/committer/LakeCommitResult.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/lake/committer/LakeCommitResult.java
@@ -61,23 +61,38 @@ public class LakeCommitResult {
     // 2: committedIsReadable is true, committed snapshot is just also readable
     @Nullable private final ReadableSnapshot readableSnapshot;
 
+    @Nullable private final TieringStats tieringStats;
+
     private LakeCommitResult(
             long committedSnapshotId,
             boolean committedIsReadable,
             @Nullable ReadableSnapshot readableSnapshot,
-            @Nullable Long earliestSnapshotIDToKeep) {
+            @Nullable Long earliestSnapshotIDToKeep,
+            @Nullable TieringStats tieringStats) {
         this.committedSnapshotId = committedSnapshotId;
         this.committedIsReadable = committedIsReadable;
         this.readableSnapshot = readableSnapshot;
         this.earliestSnapshotIDToKeep = earliestSnapshotIDToKeep;
+        this.tieringStats = tieringStats;
     }
 
     public static LakeCommitResult committedIsReadable(long 
committedSnapshotId) {
-        return new LakeCommitResult(committedSnapshotId, true, null, 
KEEP_LATEST);
+        return committedIsReadable(committedSnapshotId, null);
+    }
+
+    public static LakeCommitResult committedIsReadable(
+            long committedSnapshotId, @Nullable TieringStats tieringStats) {
+        return new LakeCommitResult(committedSnapshotId, true, null, 
KEEP_LATEST, tieringStats);
     }
 
     public static LakeCommitResult unknownReadableSnapshot(long 
committedSnapshotId) {
-        return new LakeCommitResult(committedSnapshotId, false, null, 
KEEP_ALL_PREVIOUS);
+        return unknownReadableSnapshot(committedSnapshotId, null);
+    }
+
+    public static LakeCommitResult unknownReadableSnapshot(
+            long committedSnapshotId, @Nullable TieringStats tieringStats) {
+        return new LakeCommitResult(
+                committedSnapshotId, false, null, KEEP_ALL_PREVIOUS, 
tieringStats);
     }
 
     public static LakeCommitResult withReadableSnapshot(
@@ -89,12 +104,29 @@ public class LakeCommitResult {
             // the readable log end offset for readable snapshot
             Map<TableBucket, Long> readableLogEndOffsets,
             @Nullable Long earliestSnapshotIDToKeep) {
+        return withReadableSnapshot(
+                committedSnapshotId,
+                readableSnapshotId,
+                tieredLogEndOffsets,
+                readableLogEndOffsets,
+                earliestSnapshotIDToKeep,
+                null);
+    }
+
+    public static LakeCommitResult withReadableSnapshot(
+            long committedSnapshotId,
+            long readableSnapshotId,
+            Map<TableBucket, Long> tieredLogEndOffsets,
+            Map<TableBucket, Long> readableLogEndOffsets,
+            @Nullable Long earliestSnapshotIDToKeep,
+            @Nullable TieringStats tieringStats) {
         return new LakeCommitResult(
                 committedSnapshotId,
                 false,
                 new ReadableSnapshot(
                         readableSnapshotId, tieredLogEndOffsets, 
readableLogEndOffsets),
-                earliestSnapshotIDToKeep);
+                earliestSnapshotIDToKeep,
+                tieringStats);
     }
 
     public long getCommittedSnapshotId() {
@@ -110,6 +142,16 @@ public class LakeCommitResult {
         return readableSnapshot;
     }
 
+    /**
+     * Gets the tiering stats.
+     *
+     * @return the tiering stats
+     */
+    @Nullable
+    public TieringStats getTieringStats() {
+        return tieringStats;
+    }
+
     /**
      * Gets the earliest snapshot ID to keep.
      *
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/lake/committer/TieringStats.java 
b/fluss-common/src/main/java/org/apache/fluss/lake/committer/TieringStats.java
new file mode 100644
index 000000000..32f0b11d9
--- /dev/null
+++ 
b/fluss-common/src/main/java/org/apache/fluss/lake/committer/TieringStats.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.committer;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+
+/**
+ * Immutable statistics for a single completed tiering round of a lake table.
+ *
+ * <p>Fields use {@code null} to represent "unknown / not supported".
+ */
+public final class TieringStats implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * A {@code TieringStats} instance where every field is {@code null} 
(unknown/unsupported). Use
+     * this as the default when no stats are available.
+     */
+    public static final TieringStats UNKNOWN = new TieringStats(null, null);
+
+    // 
-----------------------------------------------------------------------------------------
+    // Lake data stats (reported by the lake committer)
+    // 
-----------------------------------------------------------------------------------------
+
+    /** Cumulative total file size (bytes) of the lake table after this 
tiering round. */
+    @Nullable private final Long fileSize;
+
+    /** Cumulative total record count of the lake table after this tiering 
round. */
+    @Nullable private final Long recordCount;
+
+    public TieringStats(@Nullable Long fileSize, @Nullable Long recordCount) {
+        this.fileSize = fileSize;
+        this.recordCount = recordCount;
+    }
+
+    @Nullable
+    public Long getFileSize() {
+        return fileSize;
+    }
+
+    @Nullable
+    public Long getRecordCount() {
+        return recordCount;
+    }
+
+    /**
+     * Returns {@code true} when at least one stat field is non-{@code null}, 
meaning actual data
+     * was written during this tiering round.
+     */
+    public boolean isAvailableStats() {
+        return fileSize != null || recordCount != null;
+    }
+
+    @Override
+    public String toString() {
+        return "TieringStats{" + "fileSize=" + fileSize + ", recordCount=" + 
recordCount + '}';
+    }
+}
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java 
b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
index 1b1445665..b11c3b92f 100644
--- a/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
+++ b/fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java
@@ -63,6 +63,13 @@ public class MetricNames {
     public static final String LAKE_TIERING_PENDING_TABLES_COUNT = 
"pendingTablesCount";
     public static final String LAKE_TIERING_RUNNING_TABLES_COUNT = 
"runningTablesCount";
 
+    // for lake tiering table-level metrics
+    public static final String LAKE_TIERING_TABLE_TIER_LAG = "tierLag";
+    public static final String LAKE_TIERING_TABLE_TIER_DURATION = 
"tierDuration";
+    public static final String LAKE_TIERING_TABLE_FAILURES_TOTAL = 
"failuresTotal";
+    public static final String LAKE_TIERING_TABLE_FILE_SIZE = "fileSize";
+    public static final String LAKE_TIERING_TABLE_RECORD_COUNT = "recordCount";
+
     // 
--------------------------------------------------------------------------------------------
     // metrics for tablet server
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
index 597becb75..2f3a69a7e 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/committer/TieringCommitOperator.java
@@ -30,6 +30,7 @@ import org.apache.fluss.flink.tiering.source.TieringSource;
 import org.apache.fluss.lake.committer.CommittedLakeSnapshot;
 import org.apache.fluss.lake.committer.LakeCommitResult;
 import org.apache.fluss.lake.committer.LakeCommitter;
+import org.apache.fluss.lake.committer.TieringStats;
 import org.apache.fluss.lake.writer.LakeTieringFactory;
 import org.apache.fluss.lake.writer.LakeWriter;
 import org.apache.fluss.metadata.TableBucket;
@@ -94,6 +95,22 @@ public class TieringCommitOperator<WriteResult, Committable>
     private final Map<Long, List<TableBucketWriteResult<WriteResult>>>
             collectedTableBucketWriteResults;
 
+    /**
+     * The result of one table's commit round, holding the lake committable 
(nullable for empty
+     * commits where no data was written) and the associated tiering 
statistics.
+     */
+    private final class CommitResult {
+        /** The lake committable, or {@code null} if nothing was written in 
this round. */
+        @Nullable final Committable committable;
+        /** Per-table tiering statistics collected during this round. */
+        @Nullable final TieringStats stats;
+
+        CommitResult(@Nullable Committable committable, @Nullable TieringStats 
stats) {
+            this.committable = committable;
+            this.stats = stats;
+        }
+    }
+
     public TieringCommitOperator(
             StreamOperatorParameters<CommittableMessage<Committable>> 
parameters,
             Configuration flussConf,
@@ -135,18 +152,20 @@ public class TieringCommitOperator<WriteResult, 
Committable>
 
         if (committableWriteResults != null) {
             try {
-                Committable committable =
+                CommitResult commitResult =
                         commitWriteResults(
                                 tableId,
                                 tableBucketWriteResult.tablePath(),
                                 committableWriteResults);
-                // only emit when committable is not-null
-                if (committable != null) {
-                    output.collect(new StreamRecord<>(new 
CommittableMessage<>(committable)));
+                // only emit downstream when actual data was written
+                if (commitResult.committable != null) {
+                    output.collect(
+                            new StreamRecord<>(new 
CommittableMessage<>(commitResult.committable)));
                 }
                 // notify that the table id has been finished tier
                 operatorEventGateway.sendEventToCoordinator(
-                        new SourceEventWrapper(new 
FinishedTieringEvent(tableId)));
+                        new SourceEventWrapper(
+                                new FinishedTieringEvent(tableId, 
commitResult.stats)));
             } catch (Exception e) {
                 // if any exception happens, send to source coordinator to 
mark it as failed
                 operatorEventGateway.sendEventToCoordinator(
@@ -162,28 +181,31 @@ public class TieringCommitOperator<WriteResult, 
Committable>
         }
     }
 
-    @Nullable
-    private Committable commitWriteResults(
+    /**
+     * Commits the collected write results for one table to the lake and Fluss.
+     *
+     * <p>Always returns a non-null {@link CommitResult}. When all buckets 
produced no data (empty
+     * commit), {@link CommitResult#committable} is {@code null} and stats are 
{@link
+     * TieringStats#UNKNOWN}.
+     */
+    private CommitResult commitWriteResults(
             long tableId,
             TablePath tablePath,
             List<TableBucketWriteResult<WriteResult>> committableWriteResults)
             throws Exception {
-        // filter out non-null write result
-        committableWriteResults =
+        // filter down to buckets that actually produced data
+        List<TableBucketWriteResult<WriteResult>> nonEmptyResults =
                 committableWriteResults.stream()
-                        .filter(
-                                writeResultTableBucketWriteResult ->
-                                        
writeResultTableBucketWriteResult.writeResult() != null)
+                        .filter(r -> r.writeResult() != null)
                         .collect(Collectors.toList());
 
-        // empty, means all write result is null, which is a empty commit,
-        // return null to skip the empty commit
-        if (committableWriteResults.isEmpty()) {
+        // all buckets were empty — nothing to commit to the lake
+        if (nonEmptyResults.isEmpty()) {
             LOG.info(
                     "Commit tiering write results is empty for table {}, table 
path {}",
                     tableId,
                     tablePath);
-            return null;
+            return new CommitResult(null, null);
         }
 
         // Check if the table was dropped and recreated during tiering.
@@ -202,18 +224,15 @@ public class TieringCommitOperator<WriteResult, 
Committable>
         try (LakeCommitter<WriteResult, Committable> lakeCommitter =
                 lakeTieringFactory.createLakeCommitter(
                         new TieringCommitterInitContext(
-                                tablePath,
-                                admin.getTableInfo(tablePath).get(),
-                                lakeTieringConfig,
-                                flussConfig))) {
+                                tablePath, currentTableInfo, 
lakeTieringConfig, flussConfig))) {
             List<WriteResult> writeResults =
-                    committableWriteResults.stream()
+                    nonEmptyResults.stream()
                             .map(TableBucketWriteResult::writeResult)
                             .collect(Collectors.toList());
 
             Map<TableBucket, Long> logEndOffsets = new HashMap<>();
             Map<TableBucket, Long> logMaxTieredTimestamps = new HashMap<>();
-            for (TableBucketWriteResult<WriteResult> writeResult : 
committableWriteResults) {
+            for (TableBucketWriteResult<WriteResult> writeResult : 
nonEmptyResults) {
                 TableBucket tableBucket = writeResult.tableBucket();
                 logEndOffsets.put(tableBucket, writeResult.logEndOffset());
                 logMaxTieredTimestamps.put(tableBucket, 
writeResult.maxTimestamp());
@@ -251,7 +270,7 @@ public class TieringCommitOperator<WriteResult, Committable>
                     lakeBucketTieredOffsetsFile,
                     logEndOffsets,
                     logMaxTieredTimestamps);
-            return committable;
+            return new CommitResult(committable, 
lakeCommitResult.getTieringStats());
         }
     }
 
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/FinishedTieringEvent.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/FinishedTieringEvent.java
index 0f10dc699..a1b409322 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/FinishedTieringEvent.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/event/FinishedTieringEvent.java
@@ -17,8 +17,12 @@
 
 package org.apache.fluss.flink.tiering.event;
 
+import org.apache.fluss.lake.committer.TieringStats;
+
 import org.apache.flink.api.connector.source.SourceEvent;
 
+import javax.annotation.Nullable;
+
 /** SourceEvent used to represent a Fluss table has been tiered finished. */
 public class FinishedTieringEvent implements SourceEvent {
 
@@ -26,11 +30,24 @@ public class FinishedTieringEvent implements SourceEvent {
 
     private final long tableId;
 
-    public FinishedTieringEvent(long tableId) {
+    /** Statistics collected during this tiering round. */
+    @Nullable private final TieringStats stats;
+
+    public FinishedTieringEvent(long tableId, @Nullable TieringStats stats) {
         this.tableId = tableId;
+        this.stats = stats;
+    }
+
+    public FinishedTieringEvent(long tableId) {
+        this(tableId, null);
     }
 
     public long getTableId() {
         return tableId;
     }
+
+    @Nullable
+    public TieringStats getStats() {
+        return stats;
+    }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
index 33615039b..337222f4e 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java
@@ -30,6 +30,7 @@ import 
org.apache.fluss.flink.tiering.event.TieringReachMaxDurationEvent;
 import org.apache.fluss.flink.tiering.source.split.TieringSplit;
 import org.apache.fluss.flink.tiering.source.split.TieringSplitGenerator;
 import 
org.apache.fluss.flink.tiering.source.state.TieringSourceEnumeratorState;
+import org.apache.fluss.lake.committer.TieringStats;
 import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.rpc.GatewayClientProxy;
@@ -38,6 +39,7 @@ import org.apache.fluss.rpc.gateway.CoordinatorGateway;
 import org.apache.fluss.rpc.messages.LakeTieringHeartbeatRequest;
 import org.apache.fluss.rpc.messages.LakeTieringHeartbeatResponse;
 import org.apache.fluss.rpc.messages.PbHeartbeatReqForTable;
+import org.apache.fluss.rpc.messages.PbLakeTieringStats;
 import org.apache.fluss.rpc.messages.PbLakeTieringTableInfo;
 import org.apache.fluss.rpc.metrics.ClientMetricGroup;
 import org.apache.fluss.utils.MapUtils;
@@ -272,10 +274,10 @@ public class TieringSourceEnumerator
                         finishedTableId);
             } else {
                 boolean isForceFinished = 
tieringReachMaxDurationsTables.remove(finishedTableId);
-                LOG.info("Before finishedTables table {}.", finishedTables);
                 finishedTables.put(
-                        finishedTableId, TieringFinishInfo.from(tieringEpoch, 
isForceFinished));
-                LOG.info("After finishedTables table {}.", finishedTables);
+                        finishedTableId,
+                        TieringFinishInfo.from(
+                                tieringEpoch, isForceFinished, 
finishedTieringEvent.getStats()));
             }
         }
 
@@ -578,17 +580,32 @@ public class TieringSourceEnumerator
                         toPbHeartbeatReqForTable(tieringTableEpochs, 
coordinatorEpoch));
             }
             if (!finishedTables.isEmpty()) {
-                Map<Long, Long> finishTieringEpochs = new HashMap<>();
                 Set<Long> forceFinishedTables = new HashSet<>();
+                List<PbHeartbeatReqForTable> finishedTableReqs = new 
ArrayList<>();
                 finishedTables.forEach(
                         (tableId, tieringFinishInfo) -> {
-                            finishTieringEpochs.put(tableId, 
tieringFinishInfo.tieringEpoch);
                             if (tieringFinishInfo.isForceFinished) {
                                 forceFinishedTables.add(tableId);
                             }
+                            PbHeartbeatReqForTable pbHeartbeatReqForTable =
+                                    new PbHeartbeatReqForTable()
+                                            .setTableId(tableId)
+                                            
.setCoordinatorEpoch(coordinatorEpoch)
+                                            
.setTieringEpoch(tieringFinishInfo.tieringEpoch);
+                            TieringStats stats = tieringFinishInfo.stats;
+                            if (stats.isAvailableStats()) {
+                                PbLakeTieringStats pbLakeTieringStats = new 
PbLakeTieringStats();
+                                if (stats.getFileSize() != null) {
+                                    
pbLakeTieringStats.setFileSize(stats.getFileSize());
+                                }
+                                if (stats.getRecordCount() != null) {
+                                    
pbLakeTieringStats.setRecordCount(stats.getRecordCount());
+                                }
+                                
pbHeartbeatReqForTable.setLakeTieringStats(pbLakeTieringStats);
+                            }
+                            finishedTableReqs.add(pbHeartbeatReqForTable);
                         });
-                heartbeatRequest.addAllFinishedTables(
-                        toPbHeartbeatReqForTable(finishTieringEpochs, 
coordinatorEpoch));
+                heartbeatRequest.addAllFinishedTables(finishedTableReqs);
                 for (long forceFinishedTableId : forceFinishedTables) {
                     
heartbeatRequest.addForceFinishedTable(forceFinishedTableId);
                 }
@@ -638,26 +655,32 @@ public class TieringSourceEnumerator
 
     private static class TieringFinishInfo {
         /** The epoch of the tiering operation for this table. */
-        long tieringEpoch;
+        final long tieringEpoch;
 
         /**
          * Whether this table was force finished due to reaching the maximum 
tiering duration. When
          * a table's tiering operation exceeds the max duration (data lake 
freshness), it will be
          * force finished to prevent it from blocking other tables' tiering 
operations.
          */
-        boolean isForceFinished;
+        final boolean isForceFinished;
+
+        /** Stats collected during this tiering round. */
+        final TieringStats stats;
 
         public static TieringFinishInfo from(long tieringEpoch) {
-            return new TieringFinishInfo(tieringEpoch, false);
+            return new TieringFinishInfo(tieringEpoch, false, null);
         }
 
-        public static TieringFinishInfo from(long tieringEpoch, boolean 
isForceFinished) {
-            return new TieringFinishInfo(tieringEpoch, isForceFinished);
+        public static TieringFinishInfo from(
+                long tieringEpoch, boolean isForceFinished, @Nullable 
TieringStats stats) {
+            return new TieringFinishInfo(tieringEpoch, isForceFinished, stats);
         }
 
-        private TieringFinishInfo(long tieringEpoch, boolean isForceFinished) {
+        private TieringFinishInfo(
+                long tieringEpoch, boolean isForceFinished, @Nullable 
TieringStats stats) {
             this.tieringEpoch = tieringEpoch;
             this.isForceFinished = isForceFinished;
+            this.stats = stats != null ? stats : TieringStats.UNKNOWN;
         }
     }
 }
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
index e96a8be33..06b572760 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
@@ -141,6 +141,7 @@ public class IcebergLakeCommitter implements 
LakeCommitter<IcebergWriteResult, I
                     snapshotId = rewriteCommitSnapshotId;
                 }
             }
+            // Iceberg does not provide cumulative table stats API yet; leave 
stats as -1 (unknown).
             return LakeCommitResult.committedIsReadable(snapshotId);
         } catch (Exception e) {
             throw new IOException("Failed to commit to Iceberg table.", e);
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeCommitter.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeCommitter.java
index 47c163cdb..4fb7b96f0 100644
--- 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeCommitter.java
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeCommitter.java
@@ -78,6 +78,7 @@ public class LanceLakeCommitter implements 
LakeCommitter<LanceWriteResult, Lance
         properties.put(committerName, FLUSS_LAKE_TIERING_COMMIT_USER);
         long snapshotId =
                 LanceDatasetAdapter.commitAppend(config, 
committable.committable(), properties);
+        // Lance does not provide cumulative table stats API yet; leave stats 
as -1 (unknown).
         return LakeCommitResult.committedIsReadable(snapshotId);
     }
 
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
index 03ced0db2..1018fe78d 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
@@ -23,17 +23,20 @@ import 
org.apache.fluss.lake.committer.CommittedLakeSnapshot;
 import org.apache.fluss.lake.committer.CommitterInitContext;
 import org.apache.fluss.lake.committer.LakeCommitResult;
 import org.apache.fluss.lake.committer.LakeCommitter;
+import org.apache.fluss.lake.committer.TieringStats;
 import org.apache.fluss.lake.paimon.utils.DvTableReadableSnapshotRetriever;
 import org.apache.fluss.metadata.TablePath;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.SimpleFileEntry;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.TableSnapshot;
 import org.apache.paimon.table.sink.CommitCallback;
 import org.apache.paimon.table.sink.TableCommitImpl;
 import org.apache.paimon.utils.SnapshotManager;
@@ -46,6 +49,7 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import static 
org.apache.fluss.lake.paimon.tiering.PaimonLakeTieringFactory.FLUSS_LAKE_TIERING_COMMIT_USER;
 import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
@@ -112,9 +116,12 @@ public class PaimonLakeCommitter implements 
LakeCommitter<PaimonWriteResult, Pai
                             "Paimon committed snapshot id must be non-null.");
             currentCommitSnapshotId.remove();
 
+            // Collect cumulative table stats from the exact snapshot that was 
just committed.
+            TieringStats stats = computeTableStats();
+
             // deletion vector is disabled, committed snapshot is readable
             if (!fileStoreTable.coreOptions().deletionVectorsEnabled()) {
-                return 
LakeCommitResult.committedIsReadable(committedSnapshotId);
+                return 
LakeCommitResult.committedIsReadable(committedSnapshotId, stats);
             } else {
                 // retrieve the readable snapshot during commit
                 try (DvTableReadableSnapshotRetriever retriever =
@@ -123,7 +130,7 @@ public class PaimonLakeCommitter implements 
LakeCommitter<PaimonWriteResult, Pai
                     DvTableReadableSnapshotRetriever.ReadableSnapshotResult 
readableSnapshotResult =
                             
retriever.getReadableSnapshotAndOffsets(committedSnapshotId);
                     if (readableSnapshotResult == null) {
-                        return 
LakeCommitResult.unknownReadableSnapshot(committedSnapshotId);
+                        return 
LakeCommitResult.unknownReadableSnapshot(committedSnapshotId, stats);
                     } else {
                         long earliestSnapshotIdToKeep =
                                 
readableSnapshotResult.getEarliestSnapshotIdToKeep();
@@ -139,7 +146,8 @@ public class PaimonLakeCommitter implements 
LakeCommitter<PaimonWriteResult, Pai
                                 readableSnapshotResult.getReadableSnapshotId(),
                                 readableSnapshotResult.getTieredOffsets(),
                                 readableSnapshotResult.getReadableOffsets(),
-                                earliestSnapshotIdToKeep);
+                                earliestSnapshotIdToKeep,
+                                stats);
                     }
                 }
             }
@@ -153,6 +161,32 @@ public class PaimonLakeCommitter implements 
LakeCommitter<PaimonWriteResult, Pai
         }
     }
 
+    /** Computes cumulative table stats from the latest snapshot by REST API. 
*/
+    @Nullable
+    private TieringStats computeTableStats() {
+        Identifier identifier =
+                new Identifier(tablePath.getDatabaseName(), 
tablePath.getTableName());
+        try {
+            Optional<TableSnapshot> snapshot = 
paimonCatalog.loadSnapshot(identifier);
+            if (!snapshot.isPresent()) {
+                LOG.warn(
+                        "No snapshot found for table {}, "
+                                + "fileSize and recordCount will be reported 
as -1.",
+                        tablePath);
+                return null;
+            }
+            TableSnapshot tableSnapshot = snapshot.get();
+            return new TieringStats(tableSnapshot.fileSizeInBytes(), 
tableSnapshot.recordCount());
+        } catch (Exception e) {
+            LOG.debug(
+                    "Failed to load snapshot for table {}, "
+                            + "fileSize and recordCount will be reported as 
-1.",
+                    tablePath,
+                    e);
+            return null;
+        }
+    }
+
     @Override
     public void abort(PaimonCommittable committable) throws IOException {
         tableCommit = fileStoreTable.newCommit(FLUSS_LAKE_TIERING_COMMIT_USER);
diff --git a/fluss-rpc/src/main/proto/FlussApi.proto 
b/fluss-rpc/src/main/proto/FlussApi.proto
index f04b30756..b14238861 100644
--- a/fluss-rpc/src/main/proto/FlussApi.proto
+++ b/fluss-rpc/src/main/proto/FlussApi.proto
@@ -1063,6 +1063,16 @@ message PbHeartbeatReqForTable {
   required int32 coordinator_epoch = 2;
   // the tiering epoch when the table is assigned to be tiering service
   required int64 tiering_epoch = 3;
+  // stats for the last tiering round; only set for finished_tables in 
LakeTieringHeartbeatRequest
+  optional PbLakeTieringStats lake_tiering_stats = 4;
+}
+
+// Stats reported by the tiering service when a table finishes a tiering round.
+message PbLakeTieringStats {
+  // cumulative total file size (bytes) of the lake table after tiering
+  optional int64 file_size = 1;
+  // cumulative total record count of the lake table after tiering
+  optional int64 record_count = 2;
 }
 
 message PbHeartbeatRespForTable {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
index 388f1816f..17b6d934c 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
@@ -41,6 +41,7 @@ import org.apache.fluss.exception.UnknownServerException;
 import org.apache.fluss.exception.UnknownTableOrBucketException;
 import org.apache.fluss.fs.FileSystem;
 import org.apache.fluss.fs.FsPath;
+import org.apache.fluss.lake.committer.TieringStats;
 import org.apache.fluss.lake.lakestorage.LakeCatalog;
 import org.apache.fluss.metadata.DataLakeFormat;
 import org.apache.fluss.metadata.DatabaseDescriptor;
@@ -106,6 +107,7 @@ import org.apache.fluss.rpc.messages.PbAlterConfig;
 import org.apache.fluss.rpc.messages.PbHeartbeatReqForTable;
 import org.apache.fluss.rpc.messages.PbHeartbeatRespForTable;
 import org.apache.fluss.rpc.messages.PbKvSnapshotLeaseForTable;
+import org.apache.fluss.rpc.messages.PbLakeTieringStats;
 import org.apache.fluss.rpc.messages.PbPrepareLakeTableRespForTable;
 import org.apache.fluss.rpc.messages.PbProducerTableOffsets;
 import org.apache.fluss.rpc.messages.PbTableBucket;
@@ -853,10 +855,19 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
                     
heartbeatResponse.addFinishedTableResp().setTableId(finishTable.getTableId());
             try {
                 validateHeartbeatRequest(finishTable, currentCoordinatorEpoch);
+                TieringStats stats = TieringStats.UNKNOWN;
+                if (finishTable.hasLakeTieringStats()) {
+                    PbLakeTieringStats pbStats = 
finishTable.getLakeTieringStats();
+                    stats =
+                            new TieringStats(
+                                    pbStats.hasFileSize() ? 
pbStats.getFileSize() : null,
+                                    pbStats.hasRecordCount() ? 
pbStats.getRecordCount() : null);
+                }
                 lakeTableTieringManager.finishTableTiering(
                         finishTable.getTableId(),
                         finishTable.getTieringEpoch(),
-                        
forceFinishedTableId.contains(finishTable.getTableId()));
+                        
forceFinishedTableId.contains(finishTable.getTableId()),
+                        stats);
             } catch (Throwable e) {
                 
pbHeartbeatRespForTable.setError(ApiError.fromThrowable(e).toErrorResponse());
             }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java
index 6d412f5b7..a3448d148 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java
@@ -21,9 +21,12 @@ import org.apache.fluss.annotation.VisibleForTesting;
 import org.apache.fluss.exception.FencedTieringEpochException;
 import org.apache.fluss.exception.FlussRuntimeException;
 import org.apache.fluss.exception.TableNotExistException;
+import org.apache.fluss.lake.committer.TieringStats;
 import org.apache.fluss.metadata.TableInfo;
 import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.metrics.Counter;
 import org.apache.fluss.metrics.MetricNames;
+import org.apache.fluss.metrics.groups.MetricGroup;
 import org.apache.fluss.server.entity.LakeTieringTableInfo;
 import org.apache.fluss.server.metrics.group.LakeTieringMetricGroup;
 import org.apache.fluss.server.utils.timer.DefaultTimer;
@@ -54,6 +57,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.ToLongFunction;
 
 import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
 import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
@@ -128,8 +132,14 @@ public class LakeTableTieringManager implements 
AutoCloseable {
     // cache table_id -> table tiering epoch
     private final Map<Long, Long> tableTierEpoch;
 
-    // table_id -> the last timestamp of tiered lake snapshot
-    private final Map<Long, Long> tableLastTieredTime;
+    // table_id -> result snapshot of the last completed tiering round
+    private final Map<Long, LastTieringResult> lastTieringResult;
+
+    // table_id -> the tiering failure counter
+    private final Map<Long, Counter> tableFailureCounters;
+
+    // table_id -> start time (ms) of the currently in-progress tiering round
+    private final Map<Long, Long> currentTieringStartTime;
 
     // the live tables that are tiering,
     // from table_id -> last heartbeat time by the tiering service
@@ -170,8 +180,10 @@ public class LakeTableTieringManager implements 
AutoCloseable {
         this.lakeTieringServiceTimeoutChecker.scheduleWithFixedDelay(
                 this::checkTieringServiceTimeout, 0, 15, TimeUnit.SECONDS);
         this.tableTierEpoch = new HashMap<>();
-        this.tableLastTieredTime = new HashMap<>();
+        this.lastTieringResult = new HashMap<>();
         this.delayedTieringByTableId = new HashMap<>();
+        this.tableFailureCounters = new HashMap<>();
+        this.currentTieringStartTime = new HashMap<>();
         this.tieringMetricGroup = lakeTieringMetricGroup;
         registerMetrics();
     }
@@ -218,14 +230,18 @@ public class LakeTableTieringManager implements 
AutoCloseable {
         tablePaths.put(tableId, tableInfo.getTablePath());
         tableLakeFreshness.put(
                 tableId, 
tableInfo.getTableConfig().getDataLakeFreshness().toMillis());
-        tableLastTieredTime.put(tableId, lastTieredTime);
+        lastTieringResult.put(tableId, 
LastTieringResult.initial(lastTieredTime));
         tableTierEpoch.put(tableId, 0L);
+
+        // register table-level metrics
+        registerTableMetrics(tableId, tableInfo.getTablePath());
     }
 
+    @GuardedBy("lock")
     private void scheduleTableTiering(long tableId) {
         Long freshnessInterval = tableLakeFreshness.get(tableId);
-        Long lastTieredTime = tableLastTieredTime.get(tableId);
-        if (freshnessInterval == null || lastTieredTime == null) {
+        LastTieringResult lastResult = lastTieringResult.get(tableId);
+        if (freshnessInterval == null || lastResult == null) {
             // the table has been dropped, return directly
             return;
         }
@@ -234,7 +250,7 @@ public class LakeTableTieringManager implements 
AutoCloseable {
         if (existingDelayedTiering != null) {
             existingDelayedTiering.cancel();
         }
-        long delayMs = freshnessInterval - (clock.milliseconds() - 
lastTieredTime);
+        long delayMs = freshnessInterval - (clock.milliseconds() - 
lastResult.tieredTime);
         // if the delayMs is < 0, the DelayedTiering will be triggered at once 
without
         // adding into timing wheel.
         DelayedTiering delayedTiering = new DelayedTiering(tableId, delayMs);
@@ -242,13 +258,67 @@ public class LakeTableTieringManager implements 
AutoCloseable {
         lakeTieringScheduleTimer.add(delayedTiering);
     }
 
+    private void registerTableMetrics(long tableId, TablePath tablePath) {
+        // create table-level metric group
+        MetricGroup tableMetricGroup =
+                tieringMetricGroup.addTableLakeTieringMetricGroup(tableId, 
tablePath);
+
+        // tierLag: milliseconds since last successful tiering
+        tableMetricGroup.gauge(
+                MetricNames.LAKE_TIERING_TABLE_TIER_LAG,
+                () ->
+                        inReadLock(
+                                lock,
+                                () -> {
+                                    LastTieringResult r = 
lastTieringResult.get(tableId);
+                                    return r != null ? clock.milliseconds() - 
r.tieredTime : -1L;
+                                }));
+
+        // tierDuration: duration of last tiering job
+        tableMetricGroup.gauge(
+                MetricNames.LAKE_TIERING_TABLE_TIER_DURATION,
+                () -> inReadLock(lock, () -> getLastResultField(tableId, r -> 
r.tierDuration)));
+
+        // failuresTotal: total failure count for this table
+        Counter failuresCounter =
+                
tableMetricGroup.counter(MetricNames.LAKE_TIERING_TABLE_FAILURES_TOTAL);
+        tableFailureCounters.put(tableId, failuresCounter);
+
+        // fileSize: cumulative total file size of the lake table after the 
last tiering
+        tableMetricGroup.gauge(
+                MetricNames.LAKE_TIERING_TABLE_FILE_SIZE,
+                () -> inReadLock(lock, () -> getLastResultField(tableId, r -> 
r.fileSize)));
+
+        // recordCount: cumulative total record count of the lake table after 
the last tiering
+        tableMetricGroup.gauge(
+                MetricNames.LAKE_TIERING_TABLE_RECORD_COUNT,
+                () -> inReadLock(lock, () -> getLastResultField(tableId, r -> 
r.recordCount)));
+    }
+
+    /**
+     * Returns the value of a single field from the {@link LastTieringResult} 
for the given table,
+     * or {@code -1} if the table has no completed tiering result (e.g. not 
yet tiered or already
+     * removed).
+     *
+     * <p>Must be called under {@link #lock} (read or write).
+     */
+    @GuardedBy("lock")
+    long getLastResultField(long tableId, ToLongFunction<LastTieringResult> 
fieldExtractor) {
+        LastTieringResult r = lastTieringResult.get(tableId);
+        return r != null ? fieldExtractor.applyAsLong(r) : -1L;
+    }
+
     public void removeLakeTable(long tableId) {
         inWriteLock(
                 lock,
                 () -> {
                     tablePaths.remove(tableId);
                     tableLakeFreshness.remove(tableId);
-                    tableLastTieredTime.remove(tableId);
+                    lastTieringResult.remove(tableId);
+                    currentTieringStartTime.remove(tableId);
+                    tableFailureCounters.remove(tableId);
+                    // close and remove the metric group to unregister metrics
+                    
tieringMetricGroup.removeTableLakeTieringMetricGroup(tableId);
                     tieringStates.remove(tableId);
                     liveTieringTableIds.remove(tableId);
                     tableTierEpoch.remove(tableId);
@@ -350,11 +420,13 @@ public class LakeTableTieringManager implements 
AutoCloseable {
                 });
     }
 
-    public void finishTableTiering(long tableId, long tieredEpoch, boolean 
isForceFinished) {
+    public void finishTableTiering(
+            long tableId, long tieredEpoch, boolean isForceFinished, 
TieringStats stats) {
         inWriteLock(
                 lock,
                 () -> {
                     validateTieringServiceRequest(tableId, tieredEpoch);
+                    updateTableTieringResult(tableId, stats);
                     // to tiered state firstly
                     doHandleStateChange(tableId, TieringState.Tiered);
                     if (isForceFinished) {
@@ -367,6 +439,34 @@ public class LakeTableTieringManager implements 
AutoCloseable {
                 });
     }
 
+    @GuardedBy("lock")
+    private void updateTableTieringResult(long tableId, TieringStats stats) {
+        Long startTime = currentTieringStartTime.get(tableId);
+        long now = clock.milliseconds();
+        long duration = startTime != null ? now - startTime : -1L;
+        if (stats.isAvailableStats()) {
+            lastTieringResult.put(
+                    tableId,
+                    new LastTieringResult(
+                            now,
+                            duration,
+                            stats.getFileSize() != null ? stats.getFileSize() 
: -1L,
+                            stats.getRecordCount() != null ? 
stats.getRecordCount() : -1L));
+        } else {
+            lastTieringResult.compute(
+                    tableId,
+                    (k, prev) -> {
+                        if (prev == null) {
+                            throw new IllegalStateException(
+                                    "Last tiering result not found for table " 
+ tableId);
+                        } else {
+                            return new LastTieringResult(
+                                    now, duration, prev.fileSize, 
prev.recordCount);
+                        }
+                    });
+        }
+    }
+
     public void reportTieringFail(long tableId, long tieringEpoch) {
         inWriteLock(
                 lock,
@@ -482,13 +582,19 @@ public class LakeTableTieringManager implements 
AutoCloseable {
                 break;
             case Tiering:
                 liveTieringTableIds.put(tableId, clock.milliseconds());
+                currentTieringStartTime.put(tableId, clock.milliseconds());
                 break;
             case Tiered:
-                tableLastTieredTime.put(tableId, clock.milliseconds());
                 liveTieringTableIds.remove(tableId);
+                currentTieringStartTime.remove(tableId);
                 break;
             case Failed:
+                Counter counter = tableFailureCounters.get(tableId);
+                if (counter != null) {
+                    counter.inc();
+                }
                 liveTieringTableIds.remove(tableId);
+                currentTieringStartTime.remove(tableId);
                 // do nothing
                 break;
         }
@@ -514,7 +620,7 @@ public class LakeTableTieringManager implements 
AutoCloseable {
                 "Successfully changed tiering state for table {} from {} to 
{}.",
                 tableId,
                 fromState,
-                fromState);
+                toState);
     }
 
     @Override
@@ -636,13 +742,87 @@ public class LakeTableTieringManager implements 
AutoCloseable {
         abstract Set<TieringState> validPreviousStates();
     }
 
+    /**
+     * Immutable snapshot of the statistics collected at the end of one 
completed tiering round.
+     * Written atomically when a table transitions to {@link 
TieringState#Tiered}.
+     */
     @VisibleForTesting
-    protected int getPendingTablesCount() {
+    static class LastTieringResult {
+
+        /** Timestamp (ms) when this tiering round completed. */
+        final long tieredTime;
+
+        /** Wall-clock duration (ms) of this tiering round; {@code -1} if 
unknown. */
+        final long tierDuration;
+
+        /**
+         * Cumulative total file size (bytes) of the lake table after this 
round; {@code -1} if not
+         * reported by the lake implementation (converted from {@code null} in 
{@link
+         * TieringStats}).
+         */
+        final long fileSize;
+
+        /**
+         * Cumulative total record count of the lake table after this round; 
{@code -1} if not
+         * reported by the lake implementation (converted from {@code null} in 
{@link
+         * TieringStats}).
+         */
+        final long recordCount;
+
+        LastTieringResult(long tieredTime, long tierDuration, long fileSize, 
long recordCount) {
+            this.tieredTime = tieredTime;
+            this.tierDuration = tierDuration;
+            this.fileSize = fileSize;
+            this.recordCount = recordCount;
+        }
+
+        /**
+         * Creates the initial placeholder used when a table is first 
registered. Only {@code
+         * tieredTime} is meaningful; all other stats are {@code -1} until the 
first round
+         * completes.
+         */
+        static LastTieringResult initial(long tieredTime) {
+            return new LastTieringResult(tieredTime, -1L, -1L, -1L);
+        }
+    }
+
+    @VisibleForTesting
+    int getPendingTablesCount() {
         return inReadLock(lock, pendingTieringTables::size);
     }
 
     @VisibleForTesting
-    protected int getRunningTablesCount() {
+    int getRunningTablesCount() {
         return inReadLock(lock, liveTieringTableIds::size);
     }
+
+    @VisibleForTesting
+    Long getTableLastSuccessTime(long tableId) {
+        return inReadLock(
+                lock,
+                () -> {
+                    LastTieringResult r = lastTieringResult.get(tableId);
+                    return r != null ? r.tieredTime : null;
+                });
+    }
+
+    @VisibleForTesting
+    Long getTableFailureCount(long tableId) {
+        return inReadLock(
+                lock,
+                () -> {
+                    Counter c = tableFailureCounters.get(tableId);
+                    return c != null ? c.getCount() : 0L;
+                });
+    }
+
+    @VisibleForTesting
+    TieringState getTableState(long tableId) {
+        return inReadLock(lock, () -> tieringStates.get(tableId));
+    }
+
+    @VisibleForTesting
+    long getLastTieringResultField(long tableId, 
ToLongFunction<LastTieringResult> fieldExtractor) {
+        return inReadLock(lock, () -> getLastResultField(tableId, 
fieldExtractor));
+    }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/LakeTieringMetricGroup.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/LakeTieringMetricGroup.java
index bc59b11ff..85724cfca 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/LakeTieringMetricGroup.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/LakeTieringMetricGroup.java
@@ -18,10 +18,15 @@
 
 package org.apache.fluss.server.metrics.group;
 
+import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.metrics.CharacterFilter;
 import org.apache.fluss.metrics.groups.AbstractMetricGroup;
+import org.apache.fluss.metrics.groups.MetricGroup;
 import org.apache.fluss.metrics.registry.MetricRegistry;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import static org.apache.fluss.metrics.utils.MetricGroupUtils.makeScope;
 
 /** Metrics for lake tiering. */
@@ -29,6 +34,8 @@ public class LakeTieringMetricGroup extends 
AbstractMetricGroup {
 
     private static final String NAME = "lakeTiering";
 
+    private final Map<Long, TableMetricGroup> metricGroupByTable = new 
HashMap<>();
+
     public LakeTieringMetricGroup(MetricRegistry registry, 
CoordinatorMetricGroup parent) {
         super(registry, makeScope(parent, NAME), parent);
     }
@@ -37,4 +44,53 @@ public class LakeTieringMetricGroup extends 
AbstractMetricGroup {
     protected String getGroupName(CharacterFilter filter) {
         return NAME;
     }
+
+    // ------------------------------------------------------------------------
+    //  table lake tiering groups
+    // ------------------------------------------------------------------------
+    public MetricGroup addTableLakeTieringMetricGroup(long tableId, TablePath 
tablePath) {
+        return metricGroupByTable.computeIfAbsent(
+                tableId, table -> new TableMetricGroup(registry, this, 
tablePath, tableId));
+    }
+
+    public void removeTableLakeTieringMetricGroup(long tableId) {
+        // get the metric group of the table
+        TableMetricGroup tableMetricGroup = metricGroupByTable.get(tableId);
+        // if get the table metric group
+        if (tableMetricGroup != null) {
+            tableMetricGroup.close();
+            metricGroupByTable.remove(tableId);
+        }
+    }
+
+    /** The metric group for table. */
+    public static class TableMetricGroup extends AbstractMetricGroup {
+
+        private static final String NAME = "table";
+
+        private final TablePath tablePath;
+        private final long tableId;
+
+        public TableMetricGroup(
+                MetricRegistry registry,
+                LakeTieringMetricGroup parent,
+                TablePath tablePath,
+                long tableId) {
+            super(registry, makeScope(parent, NAME), parent);
+            this.tablePath = tablePath;
+            this.tableId = tableId;
+        }
+
+        @Override
+        protected void putVariables(Map<String, String> variables) {
+            variables.put("database", tablePath.getDatabaseName());
+            variables.put("table", tablePath.getTableName());
+            variables.put("tableId", String.valueOf(tableId));
+        }
+
+        @Override
+        protected String getGroupName(CharacterFilter filter) {
+            return NAME;
+        }
+    }
 }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableTieringManagerTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableTieringManagerTest.java
index f0ab38f78..d615f31b2 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableTieringManagerTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/LakeTableTieringManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.fluss.server.coordinator;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.exception.FencedTieringEpochException;
 import org.apache.fluss.exception.TableNotExistException;
+import org.apache.fluss.lake.committer.TieringStats;
 import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TableInfo;
@@ -133,7 +134,10 @@ class LakeTableTieringManagerTest {
         assertThatThrownBy(() -> 
tableTieringManager.reportTieringFail(tableId1, 1))
                 .isInstanceOf(TableNotExistException.class)
                 .hasMessage("The table %d doesn't exist.", tableId1);
-        assertThatThrownBy(() -> 
tableTieringManager.finishTableTiering(tableId1, 1, false))
+        assertThatThrownBy(
+                        () ->
+                                tableTieringManager.finishTableTiering(
+                                        tableId1, 1, false, 
TieringStats.UNKNOWN))
                 .isInstanceOf(TableNotExistException.class)
                 .hasMessage("The table %d doesn't exist.", tableId1);
     }
@@ -154,7 +158,7 @@ class LakeTableTieringManagerTest {
         assertThat(tableTieringManager.requestTable()).isNull();
 
         // mock lake tiering finish one-round tiering
-        tableTieringManager.finishTableTiering(tableId1, tieredEpoch, false);
+        tableTieringManager.finishTableTiering(tableId1, tieredEpoch, false, 
TieringStats.UNKNOWN);
         // not advance time, request table should return null
         assertThat(tableTieringManager.requestTable()).isNull();
 
@@ -213,12 +217,18 @@ class LakeTableTieringManagerTest {
                 .hasMessage(
                         "The tiering epoch %d is not match current epoch %d in 
coordinator for table %d.",
                         1, 2, tableId1);
-        assertThatThrownBy(() -> 
tableTieringManager.finishTableTiering(tableId1, 1, false))
+        assertThatThrownBy(
+                        () ->
+                                tableTieringManager.finishTableTiering(
+                                        tableId1, 1, false, 
TieringStats.UNKNOWN))
                 .isInstanceOf(FencedTieringEpochException.class)
                 .hasMessage(
                         "The tiering epoch %d is not match current epoch %d in 
coordinator for table %d.",
                         1, 2, tableId1);
-        assertThatThrownBy(() -> 
tableTieringManager.finishTableTiering(tableId1, 3, false))
+        assertThatThrownBy(
+                        () ->
+                                tableTieringManager.finishTableTiering(
+                                        tableId1, 3, false, 
TieringStats.UNKNOWN))
                 .isInstanceOf(FencedTieringEpochException.class)
                 .hasMessage(
                         "The tiering epoch %d is not match current epoch %d in 
coordinator for table %d.",
@@ -278,6 +288,90 @@ class LakeTableTieringManagerTest {
         assertThat(tableTieringManager.getPendingTablesCount()).isEqualTo(1); 
// back to pending
     }
 
+    @Test
+    void testTableLevelMetrics() {
+        long tableId1 = 1L;
+        TablePath tablePath1 = TablePath.of("db", "table1");
+        TableInfo tableInfo1 = createTableInfo(tableId1, tablePath1, 
Duration.ofSeconds(10));
+        tableTieringManager.addNewLakeTable(tableInfo1);
+
+        // Initially, table is just created, lastSuccessTime is the current 
time
+        Long initialTime = 
tableTieringManager.getTableLastSuccessTime(tableId1);
+        assertThat(initialTime).isNotNull();
+        assertThat(initialTime).isEqualTo(manualClock.milliseconds());
+
+        // Advance time and request table
+        manualClock.advanceTime(Duration.ofSeconds(10));
+        assertRequestTable(tableId1, tablePath1, 1);
+
+        // State should be Tiering (4)
+        assertThat(tableTieringManager.getTableState(tableId1))
+                .isEqualTo(LakeTableTieringManager.TieringState.Tiering);
+
+        // Simulate tiering duration — finish with UNKNOWN stats (empty 
commit).
+        // Duration should still be updated even when no stats/data are 
reported.
+        manualClock.advanceTime(Duration.ofSeconds(5));
+        tableTieringManager.finishTableTiering(tableId1, 1, false, 
TieringStats.UNKNOWN);
+        assertThat(tableTieringManager.getLastTieringResultField(tableId1, r 
-> r.tierDuration))
+                .isEqualTo(5000L);
+
+        // lastSuccessTime should be just now
+        assertThat(tableTieringManager.getTableLastSuccessTime(tableId1))
+                .isEqualTo(manualClock.milliseconds());
+
+        // fileSize and recordCount should be null (mapped to -1) when not 
reported
+        assertThat(tableTieringManager.getLastTieringResultField(tableId1, r 
-> r.fileSize))
+                .isEqualTo(-1L);
+        assertThat(tableTieringManager.getLastTieringResultField(tableId1, r 
-> r.recordCount))
+                .isEqualTo(-1L);
+
+        // State should be Scheduled (2) after finish
+        assertThat(tableTieringManager.getTableState(tableId1))
+                .isEqualTo(LakeTableTieringManager.TieringState.Scheduled);
+
+        // Advance time to make lastSuccessAge increase
+        manualClock.advanceTime(Duration.ofSeconds(3));
+        long lastSuccessAge =
+                manualClock.milliseconds() - 
tableTieringManager.getTableLastSuccessTime(tableId1);
+        assertThat(lastSuccessAge).isEqualTo(3000L);
+
+        // Request again and finish with valid stats
+        manualClock.advanceTime(Duration.ofSeconds(7));
+        assertRequestTable(tableId1, tablePath1, 2);
+        manualClock.advanceTime(Duration.ofSeconds(4));
+        long expectedFileSize = 1024L * 1024L;
+        long expectedRecordCount = 500L;
+        tableTieringManager.finishTableTiering(
+                tableId1, 2, false, new TieringStats(expectedFileSize, 
expectedRecordCount));
+
+        // fileSize and recordCount should reflect the reported stats
+        assertThat(tableTieringManager.getLastTieringResultField(tableId1, r 
-> r.fileSize))
+                .isEqualTo(expectedFileSize);
+        assertThat(tableTieringManager.getLastTieringResultField(tableId1, r 
-> r.recordCount))
+                .isEqualTo(expectedRecordCount);
+        // duration of the second round should be 4s
+        assertThat(tableTieringManager.getLastTieringResultField(tableId1, r 
-> r.tierDuration))
+                .isEqualTo(4000L);
+
+        // Request again and report failure — stats should remain from last 
successful tiering
+        manualClock.advanceTime(Duration.ofSeconds(10));
+        assertRequestTable(tableId1, tablePath1, 3);
+        tableTieringManager.reportTieringFail(tableId1, 3);
+
+        // Failures should increment
+        
assertThat(tableTieringManager.getTableFailureCount(tableId1)).isEqualTo(1L);
+
+        // fileSize and recordCount should remain unchanged after failure
+        assertThat(tableTieringManager.getLastTieringResultField(tableId1, r 
-> r.fileSize))
+                .isEqualTo(expectedFileSize);
+        assertThat(tableTieringManager.getLastTieringResultField(tableId1, r 
-> r.recordCount))
+                .isEqualTo(expectedRecordCount);
+
+        // State should be Pending (3) after failure
+        assertThat(tableTieringManager.getTableState(tableId1))
+                .isEqualTo(LakeTableTieringManager.TieringState.Pending);
+    }
+
     @Test
     void testForceFinishTableTieringImmediatelyRePending() {
         long tableId1 = 1L;
@@ -292,7 +386,7 @@ class LakeTableTieringManagerTest {
         assertThat(tableTieringManager.requestTable()).isNull();
 
         // mock lake tiering force finish (e.g., due to exceeding tiering 
duration)
-        tableTieringManager.finishTableTiering(tableId1, 1, true);
+        tableTieringManager.finishTableTiering(tableId1, 1, true, 
TieringStats.UNKNOWN);
         // should immediately be re-pending and can be requested again without 
waiting
         assertRequestTable(tableId1, tablePath1, 2);
 
diff --git a/website/docs/maintenance/observability/monitor-metrics.md 
b/website/docs/maintenance/observability/monitor-metrics.md
index 8bca85f66..d079c416e 100644
--- a/website/docs/maintenance/observability/monitor-metrics.md
+++ b/website/docs/maintenance/observability/monitor-metrics.md
@@ -294,7 +294,7 @@ Some metrics might not be exposed when using other JVM 
implementations (e.g. IBM
   </thead>
   <tbody>
     <tr>
-       <th rowspan="19"><strong>coordinator</strong></th>
+       <th rowspan="24"><strong>coordinator</strong></th>
       <td style={{textAlign: 'center', verticalAlign: 'middle' }} 
rowspan="10">-</td>
       <td>activeCoordinatorCount</td>
       <td>The number of active CoordinatorServer in this cluster.</td>
@@ -395,6 +395,32 @@ Some metrics might not be exposed when using other JVM 
implementations (e.g. IBM
       <td>The number of tables currently being tiered.</td>
       <td>Gauge</td>
     </tr>
+    <tr>
+      <td rowspan="5">lakeTiering_table</td>
+      <td>tierLag</td>
+      <td>Time in milliseconds since the last successful tiering operation for 
this table. For newly registered tables that have never completed a tiering 
round, the lag is measured from the time the table was registered.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>tierDuration</td>
+      <td>Duration in milliseconds of the last tiering operation for this 
table.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>failuresTotal</td>
+      <td>The total number of tiering failures for this table.</td>
+      <td>Counter</td>
+    </tr>
+    <tr>
+      <td>fileSize</td>
+      <td>Cumulative total file size (in bytes) of the lake table after the 
last tiering round. Returns -1 if no tiering has completed yet.</td>
+      <td>Gauge</td>
+    </tr>
+    <tr>
+      <td>recordCount</td>
+      <td>Cumulative total record count of the lake table after the last 
tiering round. Returns -1 if no tiering has completed yet.</td>
+      <td>Gauge</td>
+    </tr>
   </tbody>
 </table>
 

Reply via email to