This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new d2dc98f88 [lake] Store partition name in lake snapshot property (#1485)
d2dc98f88 is described below

commit d2dc98f8881c7d6212775df5682c59e1c1c5f7c9
Author: yuxia Luo <[email protected]>
AuthorDate: Tue Aug 12 20:13:40 2025 +0800

    [lake] Store partition name in lake snapshot property (#1485)
---
 .../fluss/client/metadata/LakeSnapshot.java        |  19 +++-
 .../fluss/client/utils/ClientRpcMessageUtils.java  |   6 +-
 .../alibaba/fluss/lake/committer/BucketOffset.java |  12 +--
 .../lake/committer/CommittedLakeSnapshot.java      |  19 +++-
 .../fluss/metadata/ResolvedPartitionSpec.java      |  19 ++++
 .../fluss/utils/json/BucketOffsetJsonSerde.java    |   4 +-
 .../fluss/metadata/ResolvedPartitionSpecTest.java  |   9 ++
 .../utils/json/BucketOffsetJsonSerdeTest.java      |   5 +-
 .../tiering/committer/FlussTableLakeSnapshot.java  |  24 ++---
 .../committer/FlussTableLakeSnapshotCommitter.java |  40 ++++----
 .../tiering/committer/TieringCommitOperator.java   | 109 +++++++++++++--------
 .../tiering/source/TableBucketWriteResult.java     |  10 ++
 .../source/TableBucketWriteResultSerializer.java   |  10 +-
 .../flink/tiering/source/TieringSplitReader.java   |  19 +++-
 .../FlussTableLakeSnapshotCommitterTest.java       |  43 ++++----
 .../committer/TieringCommitOperatorTest.java       |  47 +++++++--
 .../TableBucketWriteResultSerializerTest.java      |  11 ++-
 .../lake/paimon/tiering/PaimonLakeCommitter.java   |   1 +
 .../lake/paimon/tiering/PaimonTieringITCase.java   |   4 +-
 .../lake/paimon/tiering/PaimonTieringTest.java     |  51 +++++++++-
 fluss-rpc/src/main/proto/FlussApi.proto            |   2 +
 .../fluss/server/utils/ServerRpcMessageUtils.java  |  19 +++-
 .../alibaba/fluss/server/zk/ZooKeeperClient.java   |   7 +-
 .../fluss/server/zk/data/LakeTableSnapshot.java    |  29 ++++--
 .../server/zk/data/LakeTableSnapshotJsonSerde.java |  24 ++++-
 .../replica/CommitLakeTableSnapshotITCase.java     |   7 +-
 .../zk/data/LakeTableSnapshotJsonSerdeTest.java    |  28 +++++-
 27 files changed, 429 insertions(+), 149 deletions(-)

diff --git 
a/fluss-client/src/main/java/com/alibaba/fluss/client/metadata/LakeSnapshot.java
 
b/fluss-client/src/main/java/com/alibaba/fluss/client/metadata/LakeSnapshot.java
index ff148cfb8..36228cb4c 100644
--- 
a/fluss-client/src/main/java/com/alibaba/fluss/client/metadata/LakeSnapshot.java
+++ 
b/fluss-client/src/main/java/com/alibaba/fluss/client/metadata/LakeSnapshot.java
@@ -20,6 +20,7 @@ package com.alibaba.fluss.client.metadata;
 import com.alibaba.fluss.annotation.PublicEvolving;
 import com.alibaba.fluss.metadata.TableBucket;
 
+import java.util.Collections;
 import java.util.Map;
 
 /**
@@ -36,9 +37,17 @@ public class LakeSnapshot {
     // the specific log offset of the snapshot
     private final Map<TableBucket, Long> tableBucketsOffset;
 
-    public LakeSnapshot(long snapshotId, Map<TableBucket, Long> 
tableBucketsOffset) {
+    // the partition name by partition id of this lake snapshot if
+    // is a partitioned table, empty if not a partitioned table
+    private final Map<Long, String> partitionNameById;
+
+    public LakeSnapshot(
+            long snapshotId,
+            Map<TableBucket, Long> tableBucketsOffset,
+            Map<Long, String> partitionNameById) {
         this.snapshotId = snapshotId;
         this.tableBucketsOffset = tableBucketsOffset;
+        this.partitionNameById = partitionNameById;
     }
 
     public long getSnapshotId() {
@@ -46,7 +55,11 @@ public class LakeSnapshot {
     }
 
     public Map<TableBucket, Long> getTableBucketsOffset() {
-        return tableBucketsOffset;
+        return Collections.unmodifiableMap(tableBucketsOffset);
+    }
+
+    public Map<Long, String> getPartitionNameById() {
+        return Collections.unmodifiableMap(partitionNameById);
     }
 
     @Override
@@ -56,6 +69,8 @@ public class LakeSnapshot {
                 + snapshotId
                 + ", tableBucketsOffset="
                 + tableBucketsOffset
+                + ", partitionNameById="
+                + partitionNameById
                 + '}';
     }
 }
diff --git 
a/fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientRpcMessageUtils.java
 
b/fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientRpcMessageUtils.java
index bfc36f6fd..d8b932a84 100644
--- 
a/fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientRpcMessageUtils.java
+++ 
b/fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientRpcMessageUtils.java
@@ -202,6 +202,7 @@ public class ClientRpcMessageUtils {
         long snapshotId = response.getSnapshotId();
         Map<TableBucket, Long> tableBucketsOffset =
                 new HashMap<>(response.getBucketSnapshotsCount());
+        Map<Long, String> partitionNameById = new HashMap<>();
         for (PbLakeSnapshotForBucket pbLakeSnapshotForBucket : 
response.getBucketSnapshotsList()) {
             Long partitionId =
                     pbLakeSnapshotForBucket.hasPartitionId()
@@ -209,9 +210,12 @@ public class ClientRpcMessageUtils {
                             : null;
             TableBucket tableBucket =
                     new TableBucket(tableId, partitionId, 
pbLakeSnapshotForBucket.getBucketId());
+            if (partitionId != null && 
pbLakeSnapshotForBucket.hasPartitionName()) {
+                partitionNameById.put(partitionId, 
pbLakeSnapshotForBucket.getPartitionName());
+            }
             tableBucketsOffset.put(tableBucket, 
pbLakeSnapshotForBucket.getLogOffset());
         }
-        return new LakeSnapshot(snapshotId, tableBucketsOffset);
+        return new LakeSnapshot(snapshotId, tableBucketsOffset, 
partitionNameById);
     }
 
     public static List<FsPathAndFileName> toFsPathAndFileName(
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/lake/committer/BucketOffset.java 
b/fluss-common/src/main/java/com/alibaba/fluss/lake/committer/BucketOffset.java
index d90c4805b..ae8fdbdb1 100644
--- 
a/fluss-common/src/main/java/com/alibaba/fluss/lake/committer/BucketOffset.java
+++ 
b/fluss-common/src/main/java/com/alibaba/fluss/lake/committer/BucketOffset.java
@@ -31,17 +31,17 @@ public class BucketOffset implements Serializable {
     private final long logOffset;
     private final int bucket;
     private final @Nullable Long partitionId;
-    private final @Nullable String partitionName;
+    private final @Nullable String partitionQualifiedName;
 
     public BucketOffset(
             long logOffset,
             int bucket,
             @Nullable Long partitionId,
-            @Nullable String partitionName) {
+            @Nullable String partitionQualifiedName) {
         this.logOffset = logOffset;
         this.bucket = bucket;
         this.partitionId = partitionId;
-        this.partitionName = partitionName;
+        this.partitionQualifiedName = partitionQualifiedName;
     }
 
     public long getLogOffset() {
@@ -58,8 +58,8 @@ public class BucketOffset implements Serializable {
     }
 
     @Nullable
-    public String getPartitionName() {
-        return partitionName;
+    public String getPartitionQualifiedName() {
+        return partitionQualifiedName;
     }
 
     @Override
@@ -74,6 +74,6 @@ public class BucketOffset implements Serializable {
         return bucket == that.bucket
                 && logOffset == that.logOffset
                 && Objects.equals(partitionId, that.partitionId)
-                && Objects.equals(partitionName, that.partitionName);
+                && Objects.equals(partitionQualifiedName, 
that.partitionQualifiedName);
     }
 }
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/lake/committer/CommittedLakeSnapshot.java
 
b/fluss-common/src/main/java/com/alibaba/fluss/lake/committer/CommittedLakeSnapshot.java
index a0d94b140..2e5f7947f 100644
--- 
a/fluss-common/src/main/java/com/alibaba/fluss/lake/committer/CommittedLakeSnapshot.java
+++ 
b/fluss-common/src/main/java/com/alibaba/fluss/lake/committer/CommittedLakeSnapshot.java
@@ -34,6 +34,10 @@ public class CommittedLakeSnapshot {
     // partition bucket
     private final Map<Tuple2<Long, Integer>, Long> logEndOffsets = new 
HashMap<>();
 
+    // partition id -> partition name, will be empty if is not a partitioned 
table
+    // the partition name is a qualified name in the format: 
key1=value1/key2=value2
+    private final Map<Long, String> qualifiedPartitionNameById = new 
HashMap<>();
+
     public CommittedLakeSnapshot(long lakeSnapshotId) {
         this.lakeSnapshotId = lakeSnapshotId;
     }
@@ -46,14 +50,20 @@ public class CommittedLakeSnapshot {
         logEndOffsets.put(Tuple2.of(null, bucketId), offset);
     }
 
-    public void addPartitionBucket(Long partitionId, int bucketId, long 
offset) {
+    public void addPartitionBucket(
+            Long partitionId, String partitionQualifiedName, int bucketId, 
long offset) {
         logEndOffsets.put(Tuple2.of(partitionId, bucketId), offset);
+        qualifiedPartitionNameById.put(partitionId, partitionQualifiedName);
     }
 
     public Map<Tuple2<Long, Integer>, Long> getLogEndOffsets() {
         return logEndOffsets;
     }
 
+    public Map<Long, String> getQualifiedPartitionNameById() {
+        return qualifiedPartitionNameById;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (o == null || getClass() != o.getClass()) {
@@ -61,12 +71,13 @@ public class CommittedLakeSnapshot {
         }
         CommittedLakeSnapshot that = (CommittedLakeSnapshot) o;
         return lakeSnapshotId == that.lakeSnapshotId
-                && Objects.equals(logEndOffsets, that.logEndOffsets);
+                && Objects.equals(logEndOffsets, that.logEndOffsets)
+                && Objects.equals(qualifiedPartitionNameById, 
that.qualifiedPartitionNameById);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(lakeSnapshotId, logEndOffsets);
+        return Objects.hash(lakeSnapshotId, logEndOffsets, 
qualifiedPartitionNameById);
     }
 
     @Override
@@ -76,6 +87,8 @@ public class CommittedLakeSnapshot {
                 + lakeSnapshotId
                 + ", logEndOffsets="
                 + logEndOffsets
+                + ", partitionNameById="
+                + qualifiedPartitionNameById
                 + '}';
     }
 }
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/metadata/ResolvedPartitionSpec.java
 
b/fluss-common/src/main/java/com/alibaba/fluss/metadata/ResolvedPartitionSpec.java
index 5bd8b5363..4575206da 100644
--- 
a/fluss-common/src/main/java/com/alibaba/fluss/metadata/ResolvedPartitionSpec.java
+++ 
b/fluss-common/src/main/java/com/alibaba/fluss/metadata/ResolvedPartitionSpec.java
@@ -125,6 +125,25 @@ public class ResolvedPartitionSpec {
         return sb.toString();
     }
 
+    public static ResolvedPartitionSpec fromPartitionQualifiedName(String 
qualifiedPartitionName) {
+        // convert from qualified name to ResolvedPartitionSpec
+        List<String> keys = new ArrayList<>();
+        List<String> values = new ArrayList<>();
+
+        String[] keyValuePairs = qualifiedPartitionName.split("/");
+
+        for (String pair : keyValuePairs) {
+            String[] keyValue = pair.split("=", 2);
+            if (keyValue.length != 2) {
+                throw new IllegalArgumentException(
+                        "Invalid partition name format. Expected key=value, 
got: " + pair);
+            }
+            keys.add(keyValue[0]);
+            values.add(keyValue[1]);
+        }
+        return new ResolvedPartitionSpec(keys, values);
+    }
+
     @Override
     public boolean equals(Object o) {
         if (o == null || getClass() != o.getClass()) {
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/utils/json/BucketOffsetJsonSerde.java
 
b/fluss-common/src/main/java/com/alibaba/fluss/utils/json/BucketOffsetJsonSerde.java
index f5fa71209..819e2f09f 100644
--- 
a/fluss-common/src/main/java/com/alibaba/fluss/utils/json/BucketOffsetJsonSerde.java
+++ 
b/fluss-common/src/main/java/com/alibaba/fluss/utils/json/BucketOffsetJsonSerde.java
@@ -60,8 +60,8 @@ public class BucketOffsetJsonSerde
         generator.writeNumberField(BUCKET_ID, bucketOffset.getBucket());
 
         // serialize partition name
-        if (bucketOffset.getPartitionName() != null) {
-            generator.writeStringField(PARTITION_NAME, 
bucketOffset.getPartitionName());
+        if (bucketOffset.getPartitionQualifiedName() != null) {
+            generator.writeStringField(PARTITION_NAME, 
bucketOffset.getPartitionQualifiedName());
         }
 
         // serialize bucket offset
diff --git 
a/fluss-common/src/test/java/com/alibaba/fluss/metadata/ResolvedPartitionSpecTest.java
 
b/fluss-common/src/test/java/com/alibaba/fluss/metadata/ResolvedPartitionSpecTest.java
index f9e827477..50981e33d 100644
--- 
a/fluss-common/src/test/java/com/alibaba/fluss/metadata/ResolvedPartitionSpecTest.java
+++ 
b/fluss-common/src/test/java/com/alibaba/fluss/metadata/ResolvedPartitionSpecTest.java
@@ -38,6 +38,11 @@ public class ResolvedPartitionSpecTest {
                         new PartitionSpec(Collections.singletonMap("a", "1")));
         assertThat(resolvedPartitionSpec.getPartitionName()).isEqualTo("1");
         
assertThat(resolvedPartitionSpec.getPartitionQualifiedName()).isEqualTo("a=1");
+        assertThat(
+                        ResolvedPartitionSpec.fromPartitionQualifiedName(
+                                
resolvedPartitionSpec.getPartitionQualifiedName()))
+                .isEqualTo(resolvedPartitionSpec);
+
         assertThat(resolvedPartitionSpec.getPartitionKeys())
                 .isEqualTo(Collections.singletonList("a"));
         assertThat(resolvedPartitionSpec.getPartitionValues())
@@ -52,5 +57,9 @@ public class ResolvedPartitionSpecTest {
         
assertThat(resolvedPartitionSpec.getPartitionQualifiedName()).isEqualTo("a=1/b=2");
         
assertThat(resolvedPartitionSpec.getPartitionKeys()).isEqualTo(Arrays.asList("a",
 "b"));
         
assertThat(resolvedPartitionSpec.getPartitionValues()).isEqualTo(Arrays.asList("1",
 "2"));
+        assertThat(
+                        ResolvedPartitionSpec.fromPartitionQualifiedName(
+                                
resolvedPartitionSpec.getPartitionQualifiedName()))
+                .isEqualTo(resolvedPartitionSpec);
     }
 }
diff --git 
a/fluss-common/src/test/java/com/alibaba/fluss/utils/json/BucketOffsetJsonSerdeTest.java
 
b/fluss-common/src/test/java/com/alibaba/fluss/utils/json/BucketOffsetJsonSerdeTest.java
index 7f267b160..c1913bae2 100644
--- 
a/fluss-common/src/test/java/com/alibaba/fluss/utils/json/BucketOffsetJsonSerdeTest.java
+++ 
b/fluss-common/src/test/java/com/alibaba/fluss/utils/json/BucketOffsetJsonSerdeTest.java
@@ -29,14 +29,15 @@ public class BucketOffsetJsonSerdeTest extends 
JsonSerdeTestBase<BucketOffset> {
     @Override
     protected BucketOffset[] createObjects() {
         return new BucketOffset[] {
-            new BucketOffset(10, 1, 1L, "eu-central$2023$12"), new 
BucketOffset(20, 2, null, null)
+            new BucketOffset(10, 1, 1L, 
"country=eu-central/year=2023/month=12"),
+            new BucketOffset(20, 2, null, null)
         };
     }
 
     @Override
     protected String[] expectedJsons() {
         return new String[] {
-            
"{\"partition_id\":1,\"bucket_id\":1,\"partition_name\":\"eu-central$2023$12\",\"log_offset\":10}",
+            
"{\"partition_id\":1,\"bucket_id\":1,\"partition_name\":\"country=eu-central/year=2023/month=12\",\"log_offset\":10}",
             "{\"bucket_id\":2,\"log_offset\":20}"
         };
     }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshot.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshot.java
index 294c0b94f..a9207ca49 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshot.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshot.java
@@ -18,28 +18,26 @@
 package com.alibaba.fluss.flink.tiering.committer;
 
 import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.utils.types.Tuple2;
 
 import java.util.HashMap;
 import java.util.Map;
 
 /** A lake snapshot for a Fluss table. */
-public class FlussTableLakeSnapshot {
+class FlussTableLakeSnapshot {
 
     private final long tableId;
 
     private final long lakeSnapshotId;
 
-    private final Map<TableBucket, Long> logEndOffsets;
+    // <table_bucket, partition_name> -> log end offsets,
+    // if the bucket is not of a partition, the partition_name is null
+    private final Map<Tuple2<TableBucket, String>, Long> logEndOffsets;
 
-    public FlussTableLakeSnapshot(long tableId, long lakeSnapshotId) {
-        this(tableId, lakeSnapshotId, new HashMap<>());
-    }
-
-    public FlussTableLakeSnapshot(
-            long tableId, long lakeSnapshotId, Map<TableBucket, Long> 
logEndOffsets) {
+    FlussTableLakeSnapshot(long tableId, long lakeSnapshotId) {
         this.tableId = tableId;
         this.lakeSnapshotId = lakeSnapshotId;
-        this.logEndOffsets = logEndOffsets;
+        this.logEndOffsets = new HashMap<>();
     }
 
     public long tableId() {
@@ -50,12 +48,16 @@ public class FlussTableLakeSnapshot {
         return lakeSnapshotId;
     }
 
-    public Map<TableBucket, Long> logEndOffsets() {
+    public Map<Tuple2<TableBucket, String>, Long> logEndOffsets() {
         return logEndOffsets;
     }
 
     public void addBucketOffset(TableBucket bucket, long offset) {
-        logEndOffsets.put(bucket, offset);
+        logEndOffsets.put(Tuple2.of(bucket, null), offset);
+    }
+
+    public void addPartitionBucketOffset(TableBucket bucket, String 
partitionName, long offset) {
+        logEndOffsets.put(Tuple2.of(bucket, partitionName), offset);
     }
 
     @Override
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java
index 77a08e83e..e1b88e320 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java
@@ -21,6 +21,7 @@ import com.alibaba.fluss.client.metadata.MetadataUpdater;
 import com.alibaba.fluss.config.ConfigOptions;
 import com.alibaba.fluss.config.Configuration;
 import com.alibaba.fluss.lake.committer.CommittedLakeSnapshot;
+import com.alibaba.fluss.metadata.ResolvedPartitionSpec;
 import com.alibaba.fluss.metadata.TableBucket;
 import com.alibaba.fluss.metrics.registry.MetricRegistry;
 import com.alibaba.fluss.rpc.GatewayClientProxy;
@@ -33,8 +34,6 @@ import com.alibaba.fluss.rpc.metrics.ClientMetricGroup;
 import com.alibaba.fluss.utils.ExceptionUtils;
 import com.alibaba.fluss.utils.types.Tuple2;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.util.Map;
 
@@ -63,7 +62,7 @@ public class FlussTableLakeSnapshotCommitter implements 
AutoCloseable {
                         metadataUpdater::getCoordinatorServer, rpcClient, 
CoordinatorGateway.class);
     }
 
-    public void commit(FlussTableLakeSnapshot flussTableLakeSnapshot) throws 
IOException {
+    void commit(FlussTableLakeSnapshot flussTableLakeSnapshot) throws 
IOException {
         try {
             CommitLakeTableSnapshotRequest request =
                     toCommitLakeTableSnapshotRequest(flussTableLakeSnapshot);
@@ -77,10 +76,7 @@ public class FlussTableLakeSnapshotCommitter implements 
AutoCloseable {
         }
     }
 
-    public void commit(
-            long tableId,
-            @Nullable Map<String, Long> partitionIdByName,
-            CommittedLakeSnapshot committedLakeSnapshot)
+    public void commit(long tableId, CommittedLakeSnapshot 
committedLakeSnapshot)
             throws IOException {
         // construct lake snapshot to commit to Fluss
         FlussTableLakeSnapshot flussTableLakeSnapshot =
@@ -89,18 +85,22 @@ public class FlussTableLakeSnapshotCommitter implements 
AutoCloseable {
                 committedLakeSnapshot.getLogEndOffsets().entrySet()) {
             Tuple2<Long, Integer> partitionBucket = entry.getKey();
             TableBucket tableBucket;
-            if (partitionBucket.f0 == null) {
+            Long partitionId = partitionBucket.f0;
+            if (partitionId == null) {
                 tableBucket = new TableBucket(tableId, partitionBucket.f1);
+                flussTableLakeSnapshot.addBucketOffset(tableBucket, 
entry.getValue());
             } else {
-                Long partitionId = partitionBucket.f0;
-                if (partitionId != null) {
-                    tableBucket = new TableBucket(tableId, partitionId, 
partitionBucket.f1);
-                } else {
-                    // let's skip the bucket
-                    continue;
-                }
+                tableBucket = new TableBucket(tableId, partitionId, 
partitionBucket.f1);
+                // the partition name is qualified partition name in format:
+                // key1=value1/key2=value2.
+                // we need to convert to partition name in format: 
value1$value2$
+                String qualifiedPartitionName =
+                        
committedLakeSnapshot.getQualifiedPartitionNameById().get(partitionId);
+                ResolvedPartitionSpec resolvedPartitionSpec =
+                        
ResolvedPartitionSpec.fromPartitionQualifiedName(qualifiedPartitionName);
+                flussTableLakeSnapshot.addPartitionBucketOffset(
+                        tableBucket, resolvedPartitionSpec.getPartitionName(), 
entry.getValue());
             }
-            flussTableLakeSnapshot.addBucketOffset(tableBucket, 
entry.getValue());
         }
         commit(flussTableLakeSnapshot);
     }
@@ -114,15 +114,19 @@ public class FlussTableLakeSnapshotCommitter implements 
AutoCloseable {
 
         pbLakeTableSnapshotInfo.setTableId(flussTableLakeSnapshot.tableId());
         
pbLakeTableSnapshotInfo.setSnapshotId(flussTableLakeSnapshot.lakeSnapshotId());
-        for (Map.Entry<TableBucket, Long> bucketEndOffsetEntry :
+        for (Map.Entry<Tuple2<TableBucket, String>, Long> bucketEndOffsetEntry 
:
                 flussTableLakeSnapshot.logEndOffsets().entrySet()) {
             PbLakeTableOffsetForBucket pbLakeTableOffsetForBucket =
                     pbLakeTableSnapshotInfo.addBucketsReq();
-            TableBucket tableBucket = bucketEndOffsetEntry.getKey();
+            TableBucket tableBucket = bucketEndOffsetEntry.getKey().f0;
+            String partitionName = bucketEndOffsetEntry.getKey().f1;
             long endOffset = bucketEndOffsetEntry.getValue();
             if (tableBucket.getPartitionId() != null) {
                 
pbLakeTableOffsetForBucket.setPartitionId(tableBucket.getPartitionId());
             }
+            if (partitionName != null) {
+                pbLakeTableOffsetForBucket.setPartitionName(partitionName);
+            }
             pbLakeTableOffsetForBucket.setBucketId(tableBucket.getBucket());
             pbLakeTableOffsetForBucket.setLogEndOffset(endOffset);
         }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperator.java
index ffb7ecb5b..64d78d7c2 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperator.java
@@ -32,7 +32,7 @@ import com.alibaba.fluss.lake.committer.CommittedLakeSnapshot;
 import com.alibaba.fluss.lake.committer.LakeCommitter;
 import com.alibaba.fluss.lake.writer.LakeTieringFactory;
 import com.alibaba.fluss.lake.writer.LakeWriter;
-import com.alibaba.fluss.metadata.PartitionInfo;
+import com.alibaba.fluss.metadata.ResolvedPartitionSpec;
 import com.alibaba.fluss.metadata.TableBucket;
 import com.alibaba.fluss.metadata.TableInfo;
 import com.alibaba.fluss.metadata.TablePath;
@@ -190,9 +190,9 @@ public class TieringCommitOperator<WriteResult, Committable>
                             .collect(Collectors.toList());
 
             LakeSnapshot flussCurrentLakeSnapshot = 
getLatestLakeSnapshot(tablePath);
-            Map<TableBucket, Long> logOffsets =
-                    mergeTableBucketOffsets(flussCurrentLakeSnapshot, 
committableWriteResults);
-
+            Map<String, String> logOffsetsProperty =
+                    toBucketOffsetsProperty(
+                            tablePath, flussCurrentLakeSnapshot, 
committableWriteResults);
             // to committable
             Committable committable = 
lakeCommitter.toCommittable(writeResults);
             // before commit to lake, check fluss not missing any lake 
snapshot committed by fluss
@@ -203,32 +203,86 @@ public class TieringCommitOperator<WriteResult, 
Committable>
                     flussCurrentLakeSnapshot == null
                             ? null
                             : flussCurrentLakeSnapshot.getSnapshotId());
-            long committedSnapshotId =
-                    lakeCommitter.commit(committable, 
toBucketOffsetsProperty(logOffsets));
+            long committedSnapshotId = lakeCommitter.commit(committable, 
logOffsetsProperty);
             // commit to fluss
-            Map<TableBucket, Long> logEndOffsets = new HashMap<>();
+            FlussTableLakeSnapshot flussTableLakeSnapshot =
+                    new FlussTableLakeSnapshot(tableId, committedSnapshotId);
             for (TableBucketWriteResult<WriteResult> writeResult : 
committableWriteResults) {
-                logEndOffsets.put(writeResult.tableBucket(), 
writeResult.logEndOffset());
+                TableBucket tableBucket = writeResult.tableBucket();
+                if (writeResult.tableBucket().getPartitionId() == null) {
+                    flussTableLakeSnapshot.addBucketOffset(tableBucket, 
writeResult.logEndOffset());
+                } else {
+                    flussTableLakeSnapshot.addPartitionBucketOffset(
+                            tableBucket, writeResult.partitionName(), 
writeResult.logEndOffset());
+                }
             }
-            flussTableLakeSnapshotCommitter.commit(
-                    new FlussTableLakeSnapshot(tableId, committedSnapshotId, 
logEndOffsets));
+            flussTableLakeSnapshotCommitter.commit(flussTableLakeSnapshot);
             return committable;
         }
     }
 
+    /**
+     * Merge the log offsets of latest snapshot with current written bucket 
offsets to get full log
+     * offsets.
+     */
+    private Map<String, String> toBucketOffsetsProperty(
+            TablePath tablePath,
+            @Nullable LakeSnapshot latestLakeSnapshot,
+            List<TableBucketWriteResult<WriteResult>> currentWriteResults)
+            throws Exception {
+        // first of all, we need to merge latest lake snapshot with current 
write results
+        Map<TableBucket, Long> tableBucketOffsets = new HashMap<>();
+        Map<Long, String> partitionNameById = new HashMap<>();
+        if (latestLakeSnapshot != null) {
+            tableBucketOffsets = new 
HashMap<>(latestLakeSnapshot.getTableBucketsOffset());
+            partitionNameById = new 
HashMap<>(latestLakeSnapshot.getPartitionNameById());
+        }
+
+        for (TableBucketWriteResult<WriteResult> tableBucketWriteResult : 
currentWriteResults) {
+            tableBucketOffsets.put(
+                    tableBucketWriteResult.tableBucket(), 
tableBucketWriteResult.logEndOffset());
+            if (tableBucketWriteResult.tableBucket().getPartitionId() != null
+                    && tableBucketWriteResult.partitionName() != null) {
+                partitionNameById.put(
+                        tableBucketWriteResult.tableBucket().getPartitionId(),
+                        tableBucketWriteResult.partitionName());
+            }
+        }
+
+        List<String> partitionKeys = new ArrayList<>();
+        if (!partitionNameById.isEmpty()) {
+            partitionKeys = 
admin.getTableInfo(tablePath).get().getPartitionKeys();
+        }
+
+        // then, serialize the bucket offsets, partition name by id
+        return toBucketOffsetsProperty(tableBucketOffsets, partitionNameById, 
partitionKeys);
+    }
+
     public static Map<String, String> toBucketOffsetsProperty(
-            Map<TableBucket, Long> tableBucketOffsets) throws IOException {
+            Map<TableBucket, Long> tableBucketOffsets,
+            Map<Long, String> partitionNameById,
+            List<String> partitionKeys)
+            throws IOException {
         StringWriter sw = new StringWriter();
         try (JsonGenerator gen = JACKSON_FACTORY.createGenerator(sw)) {
             gen.writeStartArray();
             for (Map.Entry<TableBucket, Long> entry : 
tableBucketOffsets.entrySet()) {
+                Long partitionId = entry.getKey().getPartitionId();
+                String partitionQualifiedName = null;
+                if (partitionId != null) {
+                    // the partitionName is 2025$12$03, we need to convert to
+                    // qualified name year=2025/month=12/day=03
+                    partitionQualifiedName =
+                            ResolvedPartitionSpec.fromPartitionName(
+                                            partitionKeys, 
partitionNameById.get(partitionId))
+                                    .getPartitionQualifiedName();
+                }
                 BucketOffsetJsonSerde.INSTANCE.serialize(
                         new BucketOffset(
                                 entry.getValue(),
                                 entry.getKey().getBucket(),
                                 entry.getKey().getPartitionId(),
-                                // todo: fill partition name in #1448
-                                null),
+                                partitionQualifiedName),
                         gen);
             }
             gen.writeEndArray();
@@ -240,24 +294,6 @@ public class TieringCommitOperator<WriteResult, 
Committable>
         };
     }
 
-    /**
-     * Merge the log offsets of latest snapshot with current written bucket 
offsets to get full log
-     * offsets.
-     */
-    private Map<TableBucket, Long> mergeTableBucketOffsets(
-            @Nullable LakeSnapshot latestLakeSnapshot,
-            List<TableBucketWriteResult<WriteResult>> currentWriteResults) {
-        Map<TableBucket, Long> tableBucketOffsets =
-                latestLakeSnapshot == null
-                        ? new HashMap<>()
-                        : new 
HashMap<>(latestLakeSnapshot.getTableBucketsOffset());
-        for (TableBucketWriteResult<WriteResult> tableBucketWriteResult : 
currentWriteResults) {
-            tableBucketOffsets.put(
-                    tableBucketWriteResult.tableBucket(), 
tableBucketWriteResult.logEndOffset());
-        }
-        return tableBucketOffsets;
-    }
-
     @Nullable
     private LakeSnapshot getLatestLakeSnapshot(TablePath tablePath) throws 
Exception {
         LakeSnapshot flussCurrentLakeSnapshot;
@@ -293,17 +329,8 @@ public class TieringCommitOperator<WriteResult, 
Committable>
         if (missingCommittedSnapshot != null) {
             // commit this missing snapshot to fluss
             TableInfo tableInfo = admin.getTableInfo(tablePath).get();
-            Map<String, Long> partitionIdByName = null;
-            if (tableInfo.isPartitioned()) {
-                partitionIdByName =
-                        admin.listPartitionInfos(tablePath).get().stream()
-                                .collect(
-                                        Collectors.toMap(
-                                                
PartitionInfo::getPartitionName,
-                                                
PartitionInfo::getPartitionId));
-            }
             flussTableLakeSnapshotCommitter.commit(
-                    tableInfo.getTableId(), partitionIdByName, 
missingCommittedSnapshot);
+                    tableInfo.getTableId(), missingCommittedSnapshot);
             // abort this committable to delete the written files
             lakeCommitter.abort(committable);
             throw new IllegalStateException(
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TableBucketWriteResult.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TableBucketWriteResult.java
index 2d06898fe..7260fbf8f 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TableBucketWriteResult.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TableBucketWriteResult.java
@@ -39,6 +39,9 @@ public class TableBucketWriteResult<WriteResult> implements 
Serializable {
 
     private final TableBucket tableBucket;
 
+    // null when the bucket is not for a partition
+    @Nullable private final String partitionName;
+
     // will be null when no any data write, such as for tiering a empty log 
split
     @Nullable private final WriteResult writeResult;
 
@@ -53,11 +56,13 @@ public class TableBucketWriteResult<WriteResult> implements 
Serializable {
     public TableBucketWriteResult(
             TablePath tablePath,
             TableBucket tableBucket,
+            @Nullable String partitionName,
             @Nullable WriteResult writeResult,
             long logEndOffset,
             int numberOfWriteResults) {
         this.tablePath = tablePath;
         this.tableBucket = tableBucket;
+        this.partitionName = partitionName;
         this.writeResult = writeResult;
         this.logEndOffset = logEndOffset;
         this.numberOfWriteResults = numberOfWriteResults;
@@ -71,6 +76,11 @@ public class TableBucketWriteResult<WriteResult> implements 
Serializable {
         return tableBucket;
     }
 
+    @Nullable
+    public String partitionName() {
+        return partitionName;
+    }
+
     @Nullable
     public WriteResult writeResult() {
         return writeResult;
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java
index 6af8ab53b..486b361af 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TableBucketWriteResultSerializer.java
@@ -65,6 +65,7 @@ public class TableBucketWriteResultSerializer<WriteResult>
         if (tableBucket.getPartitionId() != null) {
             out.writeBoolean(true);
             out.writeLong(tableBucket.getPartitionId());
+            out.writeUTF(tableBucketWriteResult.partitionName());
         } else {
             out.writeBoolean(false);
         }
@@ -107,8 +108,10 @@ public class TableBucketWriteResultSerializer<WriteResult>
         // deserialize bucket
         long tableId = in.readLong();
         Long partitionId = null;
+        String partitionName = null;
         if (in.readBoolean()) {
             partitionId = in.readLong();
+            partitionName = in.readUTF();
         }
         int bucketId = in.readInt();
         TableBucket tableBucket = new TableBucket(tableId, partitionId, 
bucketId);
@@ -129,6 +132,11 @@ public class TableBucketWriteResultSerializer<WriteResult>
         // deserialize number of write results
         int numberOfWriteResults = in.readInt();
         return new TableBucketWriteResult<>(
-                tablePath, tableBucket, writeResult, logEndOffset, 
numberOfWriteResults);
+                tablePath,
+                tableBucket,
+                partitionName,
+                writeResult,
+                logEndOffset,
+                numberOfWriteResults);
     }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TieringSplitReader.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TieringSplitReader.java
index 80cbe664a..e3b0107a4 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TieringSplitReader.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/source/TieringSplitReader.java
@@ -279,9 +279,13 @@ public class TieringSplitReader<WriteResult>
                     // todo: should unsubscribe the log split if unsubscribe 
bucket for
                     // un-partitioned table is supported
                 }
+                TieringSplit currentTieringSplit = 
currentTableSplitsByBucket.remove(bucket);
+                String currentSplitId = currentTieringSplit.splitId();
                 // put write result of the bucket
-                writeResults.put(bucket, completeLakeWriter(bucket, 
stoppingOffset));
-                String currentSplitId = 
currentTableSplitsByBucket.remove(bucket).splitId();
+                writeResults.put(
+                        bucket,
+                        completeLakeWriter(
+                                bucket, 
currentTieringSplit.getPartitionName(), stoppingOffset));
                 // put split of the bucket
                 finishedSplitIds.put(bucket, currentSplitId);
                 LOG.info("Split {} has been finished.", currentSplitId);
@@ -312,13 +316,15 @@ public class TieringSplitReader<WriteResult>
     }
 
     private TableBucketWriteResult<WriteResult> completeLakeWriter(
-            TableBucket bucket, long logEndOffset) throws IOException {
+            TableBucket bucket, @Nullable String partitionName, long 
logEndOffset)
+            throws IOException {
         LakeWriter<WriteResult> lakeWriter = lakeWriters.remove(bucket);
         WriteResult writeResult = lakeWriter.complete();
         lakeWriter.close();
         return toTableBucketWriteResult(
                 currentTablePath,
                 bucket,
+                partitionName,
                 writeResult,
                 logEndOffset,
                 checkNotNull(currentTableNumberOfSplits));
@@ -335,6 +341,7 @@ public class TieringSplitReader<WriteResult>
                     toTableBucketWriteResult(
                             logSplit.getTablePath(),
                             tableBucket,
+                            logSplit.getPartitionName(),
                             null,
                             logSplit.getStoppingOffset(),
                             logSplit.getNumberOfSplits()));
@@ -355,7 +362,8 @@ public class TieringSplitReader<WriteResult>
         long logEndOffset = currentSnapshotSplit.getLogOffsetOfSnapshot();
         String splitId = 
currentTableSplitsByBucket.remove(tableBucket).splitId();
         TableBucketWriteResult<WriteResult> writeResult =
-                completeLakeWriter(tableBucket, logEndOffset);
+                completeLakeWriter(
+                        tableBucket, currentSnapshotSplit.getPartitionName(), 
logEndOffset);
         closeCurrentSnapshotSplit();
         mayFinishCurrentTable();
         return new TableBucketWriteResultWithSplitIds(
@@ -472,11 +480,12 @@ public class TieringSplitReader<WriteResult>
     private TableBucketWriteResult<WriteResult> toTableBucketWriteResult(
             TablePath tablePath,
             TableBucket tableBucket,
+            @Nullable String partitionName,
             @Nullable WriteResult writeResult,
             long endLogOffset,
             int numberOfSplits) {
         return new TableBucketWriteResult<>(
-                tablePath, tableBucket, writeResult, endLogOffset, 
numberOfSplits);
+                tablePath, tableBucket, partitionName, writeResult, 
endLogOffset, numberOfSplits);
     }
 
     private class TableBucketWriteResultWithSplitIds
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java
index 26b3580dd..c73cd4999 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java
@@ -20,7 +20,7 @@ package com.alibaba.fluss.flink.tiering.committer;
 import com.alibaba.fluss.client.metadata.LakeSnapshot;
 import com.alibaba.fluss.flink.utils.FlinkTestBase;
 import com.alibaba.fluss.lake.committer.CommittedLakeSnapshot;
-import com.alibaba.fluss.metadata.PartitionInfo;
+import com.alibaba.fluss.metadata.ResolvedPartitionSpec;
 import com.alibaba.fluss.metadata.TableBucket;
 import com.alibaba.fluss.metadata.TablePath;
 
@@ -34,7 +34,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 import static 
com.alibaba.fluss.record.TestData.DATA1_PARTITIONED_TABLE_DESCRIPTOR;
 import static com.alibaba.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
@@ -71,14 +70,15 @@ class FlussTableLakeSnapshotCommitterTest extends 
FlinkTestBase {
                                 ? DATA1_PARTITIONED_TABLE_DESCRIPTOR
                                 : DATA1_TABLE_DESCRIPTOR);
 
-        List<Long> partitions;
-        Map<String, Long> partitionNameAndIds = Collections.emptyMap();
+        List<String> partitions;
+        Map<String, Long> partitionNameAndIds = new HashMap<>();
+        Map<Long, String> expectedPartitionNameById = new HashMap<>();
         if (!isPartitioned) {
             FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId);
             partitions = Collections.singletonList(null);
         } else {
             partitionNameAndIds = 
FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath);
-            partitions = new ArrayList<>(partitionNameAndIds.values());
+            partitions = new ArrayList<>(partitionNameAndIds.keySet());
         }
 
         CommittedLakeSnapshot committedLakeSnapshot = new 
CommittedLakeSnapshot(3);
@@ -86,34 +86,37 @@ class FlussTableLakeSnapshotCommitterTest extends 
FlinkTestBase {
         Map<TableBucket, Long> expectedOffsets = new HashMap<>();
         for (int bucket = 0; bucket < 3; bucket++) {
             long bucketOffset = bucket * bucket;
-            for (Long partition : partitions) {
-                if (partition == null) {
+            for (String partitionName : partitions) {
+                if (partitionName == null) {
                     committedLakeSnapshot.addBucket(bucket, bucketOffset);
                     expectedOffsets.put(new TableBucket(tableId, bucket), 
bucketOffset);
                 } else {
-                    committedLakeSnapshot.addPartitionBucket(partition, 
bucket, bucketOffset);
-                    expectedOffsets.put(new TableBucket(tableId, partition, 
bucket), bucketOffset);
+                    long partitionId = partitionNameAndIds.get(partitionName);
+                    committedLakeSnapshot.addPartitionBucket(
+                            partitionId,
+                            ResolvedPartitionSpec.fromPartitionName(
+                                            Collections.singletonList("a"), 
partitionName)
+                                    .getPartitionQualifiedName(),
+                            bucket,
+                            bucketOffset);
+                    expectedOffsets.put(
+                            new TableBucket(tableId, partitionId, bucket), 
bucketOffset);
+                    expectedPartitionNameById.put(partitionId, partitionName);
                 }
             }
         }
 
-        Map<String, Long> partitionIdByName = null;
-        if (isPartitioned) {
-            partitionIdByName =
-                    admin.listPartitionInfos(tablePath).get().stream()
-                            .collect(
-                                    Collectors.toMap(
-                                            PartitionInfo::getPartitionName,
-                                            PartitionInfo::getPartitionId));
-        }
-
         // commit offsets
-        flussTableLakeSnapshotCommitter.commit(tableId, partitionIdByName, 
committedLakeSnapshot);
+        flussTableLakeSnapshotCommitter.commit(tableId, committedLakeSnapshot);
         LakeSnapshot lakeSnapshot = 
admin.getLatestLakeSnapshot(tablePath).get();
         assertThat(lakeSnapshot.getSnapshotId()).isEqualTo(3);
 
         // get and check the offsets
         Map<TableBucket, Long> bucketLogOffsets = 
lakeSnapshot.getTableBucketsOffset();
         assertThat(bucketLogOffsets).isEqualTo(expectedOffsets);
+
+        // check partition name
+        Map<Long, String> partitionNameById = 
lakeSnapshot.getPartitionNameById();
+        assertThat(partitionNameById).isEqualTo(expectedPartitionNameById);
     }
 }
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
index e5e47a8ff..53721c4e1 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
@@ -26,6 +26,7 @@ import 
com.alibaba.fluss.flink.tiering.event.FinishedTieringEvent;
 import com.alibaba.fluss.flink.tiering.source.TableBucketWriteResult;
 import com.alibaba.fluss.flink.utils.FlinkTestBase;
 import com.alibaba.fluss.lake.committer.CommittedLakeSnapshot;
+import com.alibaba.fluss.metadata.ResolvedPartitionSpec;
 import com.alibaba.fluss.metadata.TableBucket;
 import com.alibaba.fluss.metadata.TablePath;
 import com.alibaba.fluss.utils.types.Tuple2;
@@ -180,7 +181,12 @@ class TieringCommitOperatorTest extends FlinkTestBase {
                 long currentOffset = offset++;
                 committerOperator.processElement(
                         createTableBucketWriteResultStreamRecord(
-                                tablePath, tableBucket, 1, currentOffset, 
numberOfWriteResults));
+                                tablePath,
+                                tableBucket,
+                                partitionIdAndNameEntry.getKey(),
+                                1,
+                                currentOffset,
+                                numberOfWriteResults));
                 expectedLogEndOffsets.put(tableBucket, currentOffset);
             }
             if (bucket == 2) {
@@ -257,9 +263,10 @@ class TieringCommitOperatorTest extends FlinkTestBase {
                 tablePath,
                 tableId,
                 2,
-                getExpectedLogEndOffsets(tableId, mockCommittedSnapshot, 
Collections.emptyMap()),
+                getExpectedLogEndOffsets(tableId, mockCommittedSnapshot),
+                mockCommittedSnapshot.getQualifiedPartitionNameById(),
                 String.format(
-                        "The current Fluss's lake snapshot %d is less than 
lake actual snapshot %d committed by Fluss for table: {tablePath=%s, 
tableId=%d},"
+                        "The current Fluss's lake snapshot %s is less than 
lake actual snapshot %d committed by Fluss for table: {tablePath=%s, 
tableId=%d},"
                                 + " missing snapshot: %s.",
                         null,
                         mockCommittedSnapshot.getLakeSnapshotId(),
@@ -309,7 +316,7 @@ class TieringCommitOperatorTest extends FlinkTestBase {
                 TableBucket tableBucket = new TableBucket(tableId, 
partitionId, bucket);
                 committerOperator.processElement(
                         createTableBucketWriteResultStreamRecord(
-                                tablePath, tableBucket, 3, 3, 
numberOfWriteResults));
+                                tablePath, tableBucket, partitionName, 3, 3, 
numberOfWriteResults));
             }
         }
 
@@ -317,9 +324,10 @@ class TieringCommitOperatorTest extends FlinkTestBase {
                 tablePath,
                 tableId,
                 3,
-                getExpectedLogEndOffsets(tableId, mockCommittedSnapshot, 
Collections.emptyMap()),
+                getExpectedLogEndOffsets(tableId, mockCommittedSnapshot),
+                mockCommittedSnapshot.getQualifiedPartitionNameById(),
                 String.format(
-                        "The current Fluss's lake snapshot %d is less than 
lake actual snapshot %d committed by Fluss for table: {tablePath=%s, 
tableId=%d}, missing snapshot: %s.",
+                        "The current Fluss's lake snapshot %s is less than 
lake actual snapshot %d committed by Fluss for table: {tablePath=%s, 
tableId=%d}, missing snapshot: %s.",
                         null,
                         mockCommittedSnapshot.getLakeSnapshotId(),
                         tablePath,
@@ -334,7 +342,13 @@ class TieringCommitOperatorTest extends FlinkTestBase {
                 if (partition == null) {
                     mockCommittedSnapshot.addBucket(bucket, bucket + 1);
                 } else {
-                    mockCommittedSnapshot.addPartitionBucket(partition, 
bucket, bucket + 1);
+                    mockCommittedSnapshot.addPartitionBucket(
+                            partition,
+                            ResolvedPartitionSpec.fromPartitionValue(
+                                            "partition_key", "partition-" + 
partition)
+                                    .getPartitionQualifiedName(),
+                            bucket,
+                            bucket + 1);
                 }
             }
         }
@@ -342,9 +356,7 @@ class TieringCommitOperatorTest extends FlinkTestBase {
     }
 
     private Map<TableBucket, Long> getExpectedLogEndOffsets(
-            long tableId,
-            CommittedLakeSnapshot committedLakeSnapshot,
-            Map<String, Long> partitionIdByName) {
+            long tableId, CommittedLakeSnapshot committedLakeSnapshot) {
         Map<TableBucket, Long> expectedLogEndOffsets = new HashMap<>();
         for (Map.Entry<Tuple2<Long, Integer>, Long> entry :
                 committedLakeSnapshot.getLogEndOffsets().entrySet()) {
@@ -368,10 +380,23 @@ class TieringCommitOperatorTest extends FlinkTestBase {
                     @Nullable Integer writeResult,
                     long logEndOffset,
                     int numberOfWriteResults) {
+        return createTableBucketWriteResultStreamRecord(
+                tablePath, tableBucket, null, writeResult, logEndOffset, 
numberOfWriteResults);
+    }
+
+    private StreamRecord<TableBucketWriteResult<TestingWriteResult>>
+            createTableBucketWriteResultStreamRecord(
+                    TablePath tablePath,
+                    TableBucket tableBucket,
+                    @Nullable String partitionName,
+                    @Nullable Integer writeResult,
+                    long logEndOffset,
+                    int numberOfWriteResults) {
         TableBucketWriteResult<TestingWriteResult> tableBucketWriteResult =
                 new TableBucketWriteResult<>(
                         tablePath,
                         tableBucket,
+                        partitionName,
                         writeResult == null ? null : new 
TestingWriteResult(writeResult),
                         logEndOffset,
                         numberOfWriteResults);
@@ -408,11 +433,13 @@ class TieringCommitOperatorTest extends FlinkTestBase {
             long tableId,
             long expectedSnapshotId,
             Map<TableBucket, Long> expectedLogEndOffsets,
+            Map<Long, String> expectedPartitionIdByName,
             String failedReason)
             throws Exception {
         LakeSnapshot lakeSnapshot = 
admin.getLatestLakeSnapshot(tablePath).get();
         assertThat(lakeSnapshot.getSnapshotId()).isEqualTo(expectedSnapshotId);
         
assertThat(lakeSnapshot.getTableBucketsOffset()).isEqualTo(expectedLogEndOffsets);
+        
assertThat(lakeSnapshot.getPartitionNameById()).isEqualTo(expectedPartitionIdByName);
 
         // check the tableId has been send to mark failed
         List<OperatorEvent> operatorEvents = 
mockOperatorEventGateway.getEventsSent();
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/source/TableBucketWriteResultSerializerTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/source/TableBucketWriteResultSerializerTest.java
index d36ad40d3..d7ef1dd0c 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/source/TableBucketWriteResultSerializerTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/source/TableBucketWriteResultSerializerTest.java
@@ -40,9 +40,11 @@ class TableBucketWriteResultSerializerTest {
         TestingWriteResult testingWriteResult = new TestingWriteResult(2);
         TablePath tablePath = TablePath.of("db1", "tb1");
         TableBucket tableBucket =
-                isPartitioned ? new TableBucket(1, 2) : new TableBucket(1, 
1000L, 2);
+                isPartitioned ? new TableBucket(1, 1000L, 2) : new 
TableBucket(1, 2);
+        String partitionName = isPartitioned ? "partition1" : null;
         TableBucketWriteResult<TestingWriteResult> tableBucketWriteResult =
-                new TableBucketWriteResult<>(tablePath, tableBucket, 
testingWriteResult, 10, 20);
+                new TableBucketWriteResult<>(
+                        tablePath, tableBucket, partitionName, 
testingWriteResult, 10, 20);
 
         // test serialize and deserialize
         byte[] serialized = 
tableBucketWriteResultSerializer.serialize(tableBucketWriteResult);
@@ -52,6 +54,7 @@ class TableBucketWriteResultSerializerTest {
 
         assertThat(deserialized.tablePath()).isEqualTo(tablePath);
         assertThat(deserialized.tableBucket()).isEqualTo(tableBucket);
+        assertThat(deserialized.partitionName()).isEqualTo(partitionName);
         TestingWriteResult deserializedWriteResult = 
deserialized.writeResult();
         assertThat(deserializedWriteResult).isNotNull();
         assertThat(deserializedWriteResult.getWriteResult())
@@ -59,13 +62,15 @@ class TableBucketWriteResultSerializerTest {
         assertThat(deserialized.numberOfWriteResults()).isEqualTo(20);
 
         // verify when writeResult is null
-        tableBucketWriteResult = new TableBucketWriteResult<>(tablePath, 
tableBucket, null, 20, 30);
+        tableBucketWriteResult =
+                new TableBucketWriteResult<>(tablePath, tableBucket, 
partitionName, null, 20, 30);
         serialized = 
tableBucketWriteResultSerializer.serialize(tableBucketWriteResult);
         deserialized =
                 tableBucketWriteResultSerializer.deserialize(
                         tableBucketWriteResultSerializer.getVersion(), 
serialized);
         assertThat(deserialized.tablePath()).isEqualTo(tablePath);
         assertThat(deserialized.tableBucket()).isEqualTo(tableBucket);
+        assertThat(deserialized.partitionName()).isEqualTo(partitionName);
         assertThat(deserialized.writeResult()).isNull();
         assertThat(deserialized.numberOfWriteResults()).isEqualTo(30);
     }
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
index 8e16e0fc4..08bcae786 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
@@ -154,6 +154,7 @@ public class PaimonLakeCommitter implements 
LakeCommitter<PaimonWriteResult, Pai
                 if (bucketOffset.getPartitionId() != null) {
                     committedLakeSnapshot.addPartitionBucket(
                             bucketOffset.getPartitionId(),
+                            bucketOffset.getPartitionQualifiedName(),
                             bucketOffset.getBucket(),
                             bucketOffset.getLogOffset());
                 } else {
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringITCase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringITCase.java
index 576d47229..0bccd2fb7 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringITCase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringITCase.java
@@ -165,8 +165,8 @@ class PaimonTieringITCase extends 
FlinkPaimonTieringTestBase {
                         put(
                                 FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
                                 "["
-                                        + 
"{\"partition_id\":0,\"bucket_id\":0,\"log_offset\":3},"
-                                        + 
"{\"partition_id\":1,\"bucket_id\":0,\"log_offset\":3}"
+                                        + 
"{\"partition_id\":0,\"bucket_id\":0,\"partition_name\":\"date=2025\",\"log_offset\":3},"
+                                        + 
"{\"partition_id\":1,\"bucket_id\":0,\"partition_name\":\"date=2026\",\"log_offset\":3}"
                                         + "]");
                     }
                 };
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
index 16984f7fe..93fd23a2f 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
@@ -62,11 +62,13 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Stream;
 
 import static 
com.alibaba.fluss.flink.tiering.committer.TieringCommitOperator.toBucketOffsetsProperty;
+import static 
com.alibaba.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
 import static com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
 import static com.alibaba.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
 import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
@@ -180,7 +182,11 @@ class PaimonTieringTest {
                             committableSerializer.getVersion(), serialized);
             long snapshot =
                     lakeCommitter.commit(
-                            paimonCommittable, 
toBucketOffsetsProperty(tableBucketOffsets));
+                            paimonCommittable,
+                            toBucketOffsetsProperty(
+                                    tableBucketOffsets,
+                                    partitionIdAndName,
+                                    getPartitionKeys(tablePath)));
             assertThat(snapshot).isEqualTo(1);
         }
 
@@ -271,7 +277,12 @@ class PaimonTieringTest {
                 createLakeCommitter(tablePath)) {
             PaimonCommittable committable = 
lakeCommitter.toCommittable(paimonWriteResults);
             long snapshot =
-                    lakeCommitter.commit(committable, 
toBucketOffsetsProperty(tableBucketOffsets));
+                    lakeCommitter.commit(
+                            committable,
+                            toBucketOffsetsProperty(
+                                    tableBucketOffsets,
+                                    partitionIdAndName,
+                                    getPartitionKeys(tablePath)));
             assertThat(snapshot).isEqualTo(1);
         }
 
@@ -296,7 +307,7 @@ class PaimonTieringTest {
 
         // Test data for different three-level partitions using $ separator
         Map<Long, String> partitionIdAndName =
-                new HashMap<Long, String>() {
+                new LinkedHashMap<Long, String>() {
                     {
                         put(1L, "us-east$2024$01");
                         put(2L, "eu-central$2023$12");
@@ -324,14 +335,27 @@ class PaimonTieringTest {
         }
 
         // Commit all data
+        long snapshot;
         try (LakeCommitter<PaimonWriteResult, PaimonCommittable> lakeCommitter 
=
                 createLakeCommitter(tablePath)) {
             PaimonCommittable committable = 
lakeCommitter.toCommittable(paimonWriteResults);
-            long snapshot =
-                    lakeCommitter.commit(committable, 
toBucketOffsetsProperty(tableBucketOffsets));
+            snapshot =
+                    lakeCommitter.commit(
+                            committable,
+                            toBucketOffsetsProperty(
+                                    tableBucketOffsets,
+                                    partitionIdAndName,
+                                    getPartitionKeys(tablePath)));
             assertThat(snapshot).isEqualTo(1);
         }
 
+        // check fluss offsets in paimon snapshot property
+        String offsetProperty = getSnapshotLogOffsetProperty(tablePath, 
snapshot);
+        assertThat(offsetProperty)
+                .isEqualTo(
+                        
"[{\"partition_id\":1,\"bucket_id\":0,\"partition_name\":\"region=us-east/year=2024/month=01\",\"log_offset\":2},"
+                                + 
"{\"partition_id\":2,\"bucket_id\":0,\"partition_name\":\"region=eu-central/year=2023/month=12\",\"log_offset\":2}]");
+
         // Verify data for each partition
         for (String partition : partitionIdAndName.values()) {
             List<LogRecord> expectRecords = recordsByPartition.get(partition);
@@ -770,4 +794,21 @@ class PaimonTieringTest {
         paimonCatalog.createDatabase(tablePath.getDatabaseName(), true);
         paimonCatalog.createTable(toPaimon(tablePath), 
paimonSchemaBuilder.build(), true);
     }
+
+    private String getSnapshotLogOffsetProperty(TablePath tablePath, long 
snapshotId)
+            throws Exception {
+        Identifier identifier = toPaimon(tablePath);
+        FileStoreTable fileStoreTable = (FileStoreTable) 
paimonCatalog.getTable(identifier);
+        return fileStoreTable
+                .snapshotManager()
+                .snapshot(snapshotId)
+                .properties()
+                .get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY);
+    }
+
+    private List<String> getPartitionKeys(TablePath tablePath) throws 
Exception {
+        Identifier identifier = toPaimon(tablePath);
+        FileStoreTable fileStoreTable = (FileStoreTable) 
paimonCatalog.getTable(identifier);
+        return fileStoreTable.partitionKeys();
+    }
 }
diff --git a/fluss-rpc/src/main/proto/FlussApi.proto 
b/fluss-rpc/src/main/proto/FlussApi.proto
index 246dbe89e..bf2f9aaa6 100644
--- a/fluss-rpc/src/main/proto/FlussApi.proto
+++ b/fluss-rpc/src/main/proto/FlussApi.proto
@@ -448,6 +448,7 @@ message PbLakeTableOffsetForBucket {
   required int32 bucket_id = 2;
   optional int64 log_start_offset = 3;
   optional int64 log_end_offset = 4;
+  optional string partition_name = 5;
 }
 
 message CommitLakeTableSnapshotResponse {
@@ -773,6 +774,7 @@ message PbLakeSnapshotForBucket {
   optional int64 partition_id = 1;
   required int32 bucket_id = 2;
   optional int64 log_offset = 3;
+  optional string partition_name = 4;
 }
 
 message PbRemotePathAndLocalFile {
diff --git 
a/fluss-server/src/main/java/com/alibaba/fluss/server/utils/ServerRpcMessageUtils.java
 
b/fluss-server/src/main/java/com/alibaba/fluss/server/utils/ServerRpcMessageUtils.java
index 034163493..a1363e6b2 100644
--- 
a/fluss-server/src/main/java/com/alibaba/fluss/server/utils/ServerRpcMessageUtils.java
+++ 
b/fluss-server/src/main/java/com/alibaba/fluss/server/utils/ServerRpcMessageUtils.java
@@ -1427,6 +1427,7 @@ public class ServerRpcMessageUtils {
             long snapshotId = pdLakeTableSnapshotInfo.getSnapshotId();
             Map<TableBucket, Long> bucketLogStartOffset = new HashMap<>();
             Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
+            Map<Long, String> partitionNameByPartitionId = new HashMap<>();
 
             for (PbLakeTableOffsetForBucket lakeTableOffsetForBucket :
                     pdLakeTableSnapshotInfo.getBucketsReqsList()) {
@@ -1447,11 +1448,20 @@ public class ServerRpcMessageUtils {
                                 : null;
                 bucketLogStartOffset.put(tableBucket, logStartOffset);
                 bucketLogEndOffset.put(tableBucket, logEndOffset);
+
+                if (lakeTableOffsetForBucket.hasPartitionName()) {
+                    partitionNameByPartitionId.put(
+                            partitionId, 
lakeTableOffsetForBucket.getPartitionName());
+                }
             }
             lakeTableInfoByTableId.put(
                     tableId,
                     new LakeTableSnapshot(
-                            snapshotId, tableId, bucketLogStartOffset, 
bucketLogEndOffset));
+                            snapshotId,
+                            tableId,
+                            bucketLogStartOffset,
+                            bucketLogEndOffset,
+                            partitionNameByPartitionId));
         }
         return new CommitLakeTableSnapshotData(lakeTableInfoByTableId);
     }
@@ -1517,6 +1527,13 @@ public class ServerRpcMessageUtils {
                     .setLogOffset(logEndLogOffsetEntry.getValue());
             if (tableBucket.getPartitionId() != null) {
                 
pbLakeSnapshotForBucket.setPartitionId(tableBucket.getPartitionId());
+                String partitionName =
+                        lakeTableSnapshot
+                                .getPartitionNameIdByPartitionId()
+                                .get(tableBucket.getPartitionId());
+                if (partitionName != null) {
+                    pbLakeSnapshotForBucket.setPartitionName(partitionName);
+                }
             }
         }
 
diff --git 
a/fluss-server/src/main/java/com/alibaba/fluss/server/zk/ZooKeeperClient.java 
b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/ZooKeeperClient.java
index 2d0500f15..f1899ffec 100644
--- 
a/fluss-server/src/main/java/com/alibaba/fluss/server/zk/ZooKeeperClient.java
+++ 
b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/ZooKeeperClient.java
@@ -789,12 +789,17 @@ public class ZooKeeperClient implements AutoCloseable {
                     new HashMap<>(previous.getBucketLogEndOffset());
             
bucketLogEndOffset.putAll(lakeTableSnapshot.getBucketLogEndOffset());
 
+            Map<Long, String> partitionNameById =
+                    new HashMap<>(previous.getPartitionNameIdByPartitionId());
+            
partitionNameById.putAll(lakeTableSnapshot.getPartitionNameIdByPartitionId());
+
             lakeTableSnapshot =
                     new LakeTableSnapshot(
                             lakeTableSnapshot.getSnapshotId(),
                             lakeTableSnapshot.getTableId(),
                             bucketLogStartOffset,
-                            bucketLogEndOffset);
+                            bucketLogEndOffset,
+                            partitionNameById);
             zkClient.setData().forPath(path, 
LakeTableZNode.encode(lakeTableSnapshot));
         } else {
             zkClient.create()
diff --git 
a/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/LakeTableSnapshot.java
 
b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/LakeTableSnapshot.java
index 8bf9bec94..96dfa3eb1 100644
--- 
a/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/LakeTableSnapshot.java
+++ 
b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/LakeTableSnapshot.java
@@ -37,15 +37,21 @@ public class LakeTableSnapshot {
     private final Map<TableBucket, Long> bucketLogStartOffset;
     private final Map<TableBucket, Long> bucketLogEndOffset;
 
+    // mapping from partition id to partition name, will be empty if the table 
is not partitioned
+    // table
+    private final Map<Long, String> partitionNameIdByPartitionId;
+
     public LakeTableSnapshot(
             long snapshotId,
             long tableId,
             Map<TableBucket, Long> bucketLogStartOffset,
-            Map<TableBucket, Long> bucketLogEndOffset) {
+            Map<TableBucket, Long> bucketLogEndOffset,
+            Map<Long, String> partitionNameIdByPartitionId) {
         this.snapshotId = snapshotId;
         this.tableId = tableId;
         this.bucketLogStartOffset = bucketLogStartOffset;
         this.bucketLogEndOffset = bucketLogEndOffset;
+        this.partitionNameIdByPartitionId = partitionNameIdByPartitionId;
     }
 
     public long getSnapshotId() {
@@ -80,24 +86,31 @@ public class LakeTableSnapshot {
         return bucketLogStartOffset;
     }
 
+    public Map<Long, String> getPartitionNameIdByPartitionId() {
+        return partitionNameIdByPartitionId;
+    }
+
     @Override
     public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (!(o instanceof LakeTableSnapshot)) {
+        if (o == null || getClass() != o.getClass()) {
             return false;
         }
         LakeTableSnapshot that = (LakeTableSnapshot) o;
         return snapshotId == that.snapshotId
                 && tableId == that.tableId
                 && Objects.equals(bucketLogStartOffset, 
that.bucketLogStartOffset)
-                && Objects.equals(bucketLogEndOffset, that.bucketLogEndOffset);
+                && Objects.equals(bucketLogEndOffset, that.bucketLogEndOffset)
+                && Objects.equals(partitionNameIdByPartitionId, 
that.partitionNameIdByPartitionId);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(snapshotId, tableId, bucketLogStartOffset, 
bucketLogEndOffset);
+        return Objects.hash(
+                snapshotId,
+                tableId,
+                bucketLogStartOffset,
+                bucketLogEndOffset,
+                partitionNameIdByPartitionId);
     }
 
     @Override
@@ -111,6 +124,8 @@ public class LakeTableSnapshot {
                 + bucketLogStartOffset
                 + ", bucketLogEndOffset="
                 + bucketLogEndOffset
+                + ", partitionNameIdByPartitionId="
+                + partitionNameIdByPartitionId
                 + '}';
     }
 }
diff --git 
a/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/LakeTableSnapshotJsonSerde.java
 
b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/LakeTableSnapshotJsonSerde.java
index a4f539149..6a1df0bea 100644
--- 
a/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/LakeTableSnapshotJsonSerde.java
+++ 
b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/LakeTableSnapshotJsonSerde.java
@@ -43,6 +43,7 @@ public class LakeTableSnapshotJsonSerde
     private static final String BUCKET_ID = "bucket_id";
     private static final String LOG_START_OFFSET = "log_start_offset";
     private static final String LOG_END_OFFSET = "log_end_offset";
+    private static final String PARTITION_NAME = "partition_name";
 
     private static final int VERSION = 1;
 
@@ -55,11 +56,19 @@ public class LakeTableSnapshotJsonSerde
         generator.writeNumberField(TABLE_ID, lakeTableSnapshot.getTableId());
 
         generator.writeArrayFieldStart(BUCKETS);
-        for (TableBucket tableBucket : 
lakeTableSnapshot.getBucketLogStartOffset().keySet()) {
+        for (TableBucket tableBucket : 
lakeTableSnapshot.getBucketLogEndOffset().keySet()) {
             generator.writeStartObject();
 
             if (tableBucket.getPartitionId() != null) {
                 generator.writeNumberField(PARTITION_ID, 
tableBucket.getPartitionId());
+                // have partition name
+                String partitionName =
+                        lakeTableSnapshot
+                                .getPartitionNameIdByPartitionId()
+                                .get(tableBucket.getPartitionId());
+                if (partitionName != null) {
+                    generator.writeStringField(PARTITION_NAME, partitionName);
+                }
             }
             generator.writeNumberField(BUCKET_ID, tableBucket.getBucket());
 
@@ -91,6 +100,7 @@ public class LakeTableSnapshotJsonSerde
         Iterator<JsonNode> buckets = node.get(BUCKETS).elements();
         Map<TableBucket, Long> bucketLogStartOffset = new HashMap<>();
         Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
+        Map<Long, String> partitionNameIdByPartitionId = new HashMap<>();
         while (buckets.hasNext()) {
             JsonNode bucket = buckets.next();
             TableBucket tableBucket;
@@ -109,7 +119,17 @@ public class LakeTableSnapshotJsonSerde
             } else {
                 bucketLogEndOffset.put(tableBucket, null);
             }
+
+            if (partitionId != null && bucket.get(PARTITION_NAME) != null) {
+                partitionNameIdByPartitionId.put(
+                        tableBucket.getPartitionId(), 
bucket.get(PARTITION_NAME).asText());
+            }
         }
-        return new LakeTableSnapshot(snapshotId, tableId, 
bucketLogStartOffset, bucketLogEndOffset);
+        return new LakeTableSnapshot(
+                snapshotId,
+                tableId,
+                bucketLogStartOffset,
+                bucketLogEndOffset,
+                partitionNameIdByPartitionId);
     }
 }
diff --git 
a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/CommitLakeTableSnapshotITCase.java
 
b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/CommitLakeTableSnapshotITCase.java
index 77bd44bae..3617f56cc 100644
--- 
a/fluss-server/src/test/java/com/alibaba/fluss/server/replica/CommitLakeTableSnapshotITCase.java
+++ 
b/fluss-server/src/test/java/com/alibaba/fluss/server/replica/CommitLakeTableSnapshotITCase.java
@@ -38,6 +38,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.time.Duration;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -131,7 +132,11 @@ class CommitLakeTableSnapshotITCase {
 
         LakeTableSnapshot expectedDataLakeTieredInfo =
                 new LakeTableSnapshot(
-                        snapshotId, tableId, bucketsLogStartOffset, 
bucketsLogEndOffset);
+                        snapshotId,
+                        tableId,
+                        bucketsLogStartOffset,
+                        bucketsLogEndOffset,
+                        Collections.emptyMap());
         checkLakeTableDataInZk(tableId, expectedDataLakeTieredInfo);
     }
 
diff --git 
a/fluss-server/src/test/java/com/alibaba/fluss/server/zk/data/LakeTableSnapshotJsonSerdeTest.java
 
b/fluss-server/src/test/java/com/alibaba/fluss/server/zk/data/LakeTableSnapshotJsonSerdeTest.java
index 82779f700..f5c15cfa1 100644
--- 
a/fluss-server/src/test/java/com/alibaba/fluss/server/zk/data/LakeTableSnapshotJsonSerdeTest.java
+++ 
b/fluss-server/src/test/java/com/alibaba/fluss/server/zk/data/LakeTableSnapshotJsonSerdeTest.java
@@ -34,7 +34,12 @@ class LakeTableSnapshotJsonSerdeTest extends 
JsonSerdeTestBase<LakeTableSnapshot
     @Override
     protected LakeTableSnapshot[] createObjects() {
         LakeTableSnapshot lakeTableSnapshot1 =
-                new LakeTableSnapshot(1, 1L, Collections.emptyMap(), 
Collections.emptyMap());
+                new LakeTableSnapshot(
+                        1,
+                        1L,
+                        Collections.emptyMap(),
+                        Collections.emptyMap(),
+                        Collections.emptyMap());
 
         long tableId = 4;
         Map<TableBucket, Long> bucketLogStartOffset = new HashMap<>();
@@ -45,10 +50,18 @@ class LakeTableSnapshotJsonSerdeTest extends 
JsonSerdeTestBase<LakeTableSnapshot
         bucketLogEndOffset.put(new TableBucket(tableId, 2), 4L);
 
         LakeTableSnapshot lakeTableSnapshot2 =
-                new LakeTableSnapshot(2, tableId, bucketLogStartOffset, 
bucketLogEndOffset);
+                new LakeTableSnapshot(
+                        2,
+                        tableId,
+                        bucketLogStartOffset,
+                        bucketLogEndOffset,
+                        Collections.emptyMap());
 
         tableId = 5;
         bucketLogStartOffset = new HashMap<>();
+        Map<Long, String> partitionNameIdByPartitionId = new HashMap<>();
+        partitionNameIdByPartitionId.put(1L, "partition1");
+        partitionNameIdByPartitionId.put(2L, "partition2");
         bucketLogStartOffset.put(new TableBucket(tableId, 1L, 1), 1L);
         bucketLogStartOffset.put(new TableBucket(tableId, 2L, 1), 2L);
 
@@ -57,7 +70,12 @@ class LakeTableSnapshotJsonSerdeTest extends 
JsonSerdeTestBase<LakeTableSnapshot
         bucketLogEndOffset.put(new TableBucket(tableId, 2L, 1), 4L);
 
         LakeTableSnapshot lakeTableSnapshot3 =
-                new LakeTableSnapshot(3, tableId, bucketLogStartOffset, 
bucketLogEndOffset);
+                new LakeTableSnapshot(
+                        3,
+                        tableId,
+                        bucketLogStartOffset,
+                        bucketLogEndOffset,
+                        partitionNameIdByPartitionId);
 
         return new LakeTableSnapshot[] {
             lakeTableSnapshot1, lakeTableSnapshot2, lakeTableSnapshot3,
@@ -72,8 +90,8 @@ class LakeTableSnapshotJsonSerdeTest extends 
JsonSerdeTestBase<LakeTableSnapshot
                     + 
"\"buckets\":[{\"bucket_id\":2,\"log_start_offset\":2,\"log_end_offset\":4},"
                     + 
"{\"bucket_id\":1,\"log_start_offset\":1,\"log_end_offset\":3}]}",
             "{\"version\":1,\"snapshot_id\":3,\"table_id\":5,"
-                    + 
"\"buckets\":[{\"partition_id\":1,\"bucket_id\":1,\"log_start_offset\":1,\"log_end_offset\":3},"
-                    + 
"{\"partition_id\":2,\"bucket_id\":1,\"log_start_offset\":2,\"log_end_offset\":4}]}"
+                    + 
"\"buckets\":[{\"partition_id\":1,\"partition_name\":\"partition1\",\"bucket_id\":1,\"log_start_offset\":1,\"log_end_offset\":3},"
+                    + 
"{\"partition_id\":2,\"partition_name\":\"partition2\",\"bucket_id\":1,\"log_start_offset\":2,\"log_end_offset\":4}]}"
         };
     }
 }

Reply via email to