This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 5075fec2 [paimon] Store lake synchronized bucket offsets in Paimon 
snapshot property (#1405)
5075fec2 is described below

commit 5075fec2d0b5ef64236d4777188bbe0ed5e0519c
Author: xx789 <[email protected]>
AuthorDate: Sun Aug 3 11:01:14 2025 +0800

    [paimon] Store lake synchronized bucket offsets in Paimon snapshot property 
(#1405)
---
 .../alibaba/fluss/lake/committer/BucketOffset.java |  79 +++++++++++++
 .../lake/committer/CommittedLakeSnapshot.java      |  10 +-
 .../fluss/lake/committer/LakeCommitter.java        |   5 +-
 .../fluss/utils/json/BucketOffsetJsonSerde.java    |  72 ++++++++++++
 .../utils/json/BucketOffsetJsonSerdeTest.java      |  43 +++++++
 .../committer/FlussTableLakeSnapshotCommitter.java |  10 +-
 .../tiering/committer/TieringCommitOperator.java   |  91 +++++++++++++--
 .../flink/tiering/TestingLakeTieringFactory.java   |   4 +-
 .../FlussTableLakeSnapshotCommitterTest.java       |  10 +-
 .../committer/TieringCommitOperatorTest.java       |  14 +--
 .../lake/paimon/tiering/PaimonLakeCommitter.java   | 125 ++++++++-------------
 .../testutils/FlinkPaimonTieringTestBase.java      |  15 +++
 .../lake/paimon/tiering/PaimonTieringITCase.java   |  26 +++++
 .../lake/paimon/tiering/PaimonTieringTest.java     |  83 +++++++++-----
 14 files changed, 446 insertions(+), 141 deletions(-)

diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/lake/committer/BucketOffset.java 
b/fluss-common/src/main/java/com/alibaba/fluss/lake/committer/BucketOffset.java
new file mode 100644
index 00000000..d90c4805
--- /dev/null
+++ 
b/fluss-common/src/main/java/com/alibaba/fluss/lake/committer/BucketOffset.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.committer;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** The bucket offset information to be expected to be stored in Lake's 
snapshot property. */
+public class BucketOffset implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+    public static final String FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY = 
"fluss-offsets";
+
+    private final long logOffset;
+    private final int bucket;
+    private final @Nullable Long partitionId;
+    private final @Nullable String partitionName;
+
+    public BucketOffset(
+            long logOffset,
+            int bucket,
+            @Nullable Long partitionId,
+            @Nullable String partitionName) {
+        this.logOffset = logOffset;
+        this.bucket = bucket;
+        this.partitionId = partitionId;
+        this.partitionName = partitionName;
+    }
+
+    public long getLogOffset() {
+        return logOffset;
+    }
+
+    public int getBucket() {
+        return bucket;
+    }
+
+    @Nullable
+    public Long getPartitionId() {
+        return partitionId;
+    }
+
+    @Nullable
+    public String getPartitionName() {
+        return partitionName;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        BucketOffset that = (BucketOffset) o;
+        return bucket == that.bucket
+                && logOffset == that.logOffset
+                && Objects.equals(partitionId, that.partitionId)
+                && Objects.equals(partitionName, that.partitionName);
+    }
+}
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/lake/committer/CommittedLakeSnapshot.java
 
b/fluss-common/src/main/java/com/alibaba/fluss/lake/committer/CommittedLakeSnapshot.java
index 6eb48bda..a0d94b14 100644
--- 
a/fluss-common/src/main/java/com/alibaba/fluss/lake/committer/CommittedLakeSnapshot.java
+++ 
b/fluss-common/src/main/java/com/alibaba/fluss/lake/committer/CommittedLakeSnapshot.java
@@ -30,9 +30,9 @@ import java.util.Objects;
 public class CommittedLakeSnapshot {
 
     private final long lakeSnapshotId;
-    // <partition_name, bucket> -> log offset, partition_name will be null if 
it's not a
+    // <partition_id, bucket> -> log offset, partition_id will be null if it's 
not a
     // partition bucket
-    private final Map<Tuple2<String, Integer>, Long> logEndOffsets = new 
HashMap<>();
+    private final Map<Tuple2<Long, Integer>, Long> logEndOffsets = new 
HashMap<>();
 
     public CommittedLakeSnapshot(long lakeSnapshotId) {
         this.lakeSnapshotId = lakeSnapshotId;
@@ -46,11 +46,11 @@ public class CommittedLakeSnapshot {
         logEndOffsets.put(Tuple2.of(null, bucketId), offset);
     }
 
-    public void addPartitionBucket(String partitionName, int bucketId, long 
offset) {
-        logEndOffsets.put(Tuple2.of(partitionName, bucketId), offset);
+    public void addPartitionBucket(Long partitionId, int bucketId, long 
offset) {
+        logEndOffsets.put(Tuple2.of(partitionId, bucketId), offset);
     }
 
-    public Map<Tuple2<String, Integer>, Long> getLogEndOffsets() {
+    public Map<Tuple2<Long, Integer>, Long> getLogEndOffsets() {
         return logEndOffsets;
     }
 
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/lake/committer/LakeCommitter.java
 
b/fluss-common/src/main/java/com/alibaba/fluss/lake/committer/LakeCommitter.java
index d49ee689..77b881dc 100644
--- 
a/fluss-common/src/main/java/com/alibaba/fluss/lake/committer/LakeCommitter.java
+++ 
b/fluss-common/src/main/java/com/alibaba/fluss/lake/committer/LakeCommitter.java
@@ -23,6 +23,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
 /**
  * The LakeCommitter interface for committing write results. It extends the 
AutoCloseable interface
@@ -48,10 +49,12 @@ public interface LakeCommitter<WriteResult, CommittableT> 
extends AutoCloseable
      * Commits the given committable object.
      *
      * @param committable the committable object
+     * @param snapshotProperties the properties that lake supported to store 
in snapshot
      * @return the committed snapshot ID
      * @throws IOException if an I/O error occurs
      */
-    long commit(CommittableT committable) throws IOException;
+    long commit(CommittableT committable, Map<String, String> 
snapshotProperties)
+            throws IOException;
 
     /**
      * Aborts the given committable object.
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/utils/json/BucketOffsetJsonSerde.java
 
b/fluss-common/src/main/java/com/alibaba/fluss/utils/json/BucketOffsetJsonSerde.java
new file mode 100644
index 00000000..f5fa7120
--- /dev/null
+++ 
b/fluss-common/src/main/java/com/alibaba/fluss/utils/json/BucketOffsetJsonSerde.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.utils.json;
+
+import com.alibaba.fluss.lake.committer.BucketOffset;
+import 
com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import 
com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+
+import java.io.IOException;
+
+/** Json serializer and deserializer for {@link BucketOffset}. */
+public class BucketOffsetJsonSerde
+        implements JsonSerializer<BucketOffset>, 
JsonDeserializer<BucketOffset> {
+
+    public static final BucketOffsetJsonSerde INSTANCE = new 
BucketOffsetJsonSerde();
+    private static final String PARTITION_ID = "partition_id";
+    private static final String BUCKET_ID = "bucket_id";
+    private static final String PARTITION_NAME = "partition_name";
+    private static final String LOG_OFFSET = "log_offset";
+
+    @Override
+    public BucketOffset deserialize(JsonNode node) {
+        JsonNode partitionIdNode = node.get(PARTITION_ID);
+        Long partitionId = partitionIdNode == null ? null : 
partitionIdNode.asLong();
+        int bucketId = node.get(BUCKET_ID).asInt();
+
+        // deserialize partition name
+        JsonNode partitionNameNode = node.get(PARTITION_NAME);
+        String partitionName = partitionNameNode == null ? null : 
partitionNameNode.asText();
+
+        // deserialize log offset
+        long logOffset = node.get(LOG_OFFSET).asLong();
+
+        return new BucketOffset(logOffset, bucketId, partitionId, 
partitionName);
+    }
+
+    @Override
+    public void serialize(BucketOffset bucketOffset, JsonGenerator generator) 
throws IOException {
+        generator.writeStartObject();
+
+        // write partition id
+        if (bucketOffset.getPartitionId() != null) {
+            generator.writeNumberField(PARTITION_ID, 
bucketOffset.getPartitionId());
+        }
+        generator.writeNumberField(BUCKET_ID, bucketOffset.getBucket());
+
+        // serialize partition name
+        if (bucketOffset.getPartitionName() != null) {
+            generator.writeStringField(PARTITION_NAME, 
bucketOffset.getPartitionName());
+        }
+
+        // serialize bucket offset
+        generator.writeNumberField(LOG_OFFSET, bucketOffset.getLogOffset());
+
+        generator.writeEndObject();
+    }
+}
diff --git 
a/fluss-common/src/test/java/com/alibaba/fluss/utils/json/BucketOffsetJsonSerdeTest.java
 
b/fluss-common/src/test/java/com/alibaba/fluss/utils/json/BucketOffsetJsonSerdeTest.java
new file mode 100644
index 00000000..7f267b16
--- /dev/null
+++ 
b/fluss-common/src/test/java/com/alibaba/fluss/utils/json/BucketOffsetJsonSerdeTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.utils.json;
+
+import com.alibaba.fluss.lake.committer.BucketOffset;
+
+/** Test for {@link BucketOffset}. */
+public class BucketOffsetJsonSerdeTest extends JsonSerdeTestBase<BucketOffset> 
{
+
+    BucketOffsetJsonSerdeTest() {
+        super(BucketOffsetJsonSerde.INSTANCE);
+    }
+
+    @Override
+    protected BucketOffset[] createObjects() {
+        return new BucketOffset[] {
+            new BucketOffset(10, 1, 1L, "eu-central$2023$12"), new 
BucketOffset(20, 2, null, null)
+        };
+    }
+
+    @Override
+    protected String[] expectedJsons() {
+        return new String[] {
+            
"{\"partition_id\":1,\"bucket_id\":1,\"partition_name\":\"eu-central$2023$12\",\"log_offset\":10}",
+            "{\"bucket_id\":2,\"log_offset\":20}"
+        };
+    }
+}
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java
index 5c11027f..77a08e83 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitter.java
@@ -85,18 +85,14 @@ public class FlussTableLakeSnapshotCommitter implements 
AutoCloseable {
         // construct lake snapshot to commit to Fluss
         FlussTableLakeSnapshot flussTableLakeSnapshot =
                 new FlussTableLakeSnapshot(tableId, 
committedLakeSnapshot.getLakeSnapshotId());
-        for (Map.Entry<Tuple2<String, Integer>, Long> entry :
+        for (Map.Entry<Tuple2<Long, Integer>, Long> entry :
                 committedLakeSnapshot.getLogEndOffsets().entrySet()) {
-            Tuple2<String, Integer> partitionBucket = entry.getKey();
+            Tuple2<Long, Integer> partitionBucket = entry.getKey();
             TableBucket tableBucket;
             if (partitionBucket.f0 == null) {
                 tableBucket = new TableBucket(tableId, partitionBucket.f1);
             } else {
-                String partitionName = partitionBucket.f0;
-                // todo: remove this
-                // in paimon 1.12, we can store this offsets(including 
partitionId) into snapshot
-                // properties, then, we won't need to get partitionId from 
partition name
-                Long partitionId = partitionIdByName.get(partitionName);
+                Long partitionId = partitionBucket.f0;
                 if (partitionId != null) {
                     tableBucket = new TableBucket(tableId, partitionId, 
partitionBucket.f1);
                 } else {
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperator.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperator.java
index 6f6720a9..97b65efc 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperator.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperator.java
@@ -20,12 +20,14 @@ package com.alibaba.fluss.flink.tiering.committer;
 import com.alibaba.fluss.client.Connection;
 import com.alibaba.fluss.client.ConnectionFactory;
 import com.alibaba.fluss.client.admin.Admin;
+import com.alibaba.fluss.client.metadata.LakeSnapshot;
 import com.alibaba.fluss.config.Configuration;
 import com.alibaba.fluss.exception.LakeTableSnapshotNotExistException;
 import com.alibaba.fluss.flink.tiering.event.FailedTieringEvent;
 import com.alibaba.fluss.flink.tiering.event.FinishedTieringEvent;
 import com.alibaba.fluss.flink.tiering.source.TableBucketWriteResult;
 import com.alibaba.fluss.flink.tiering.source.TieringSource;
+import com.alibaba.fluss.lake.committer.BucketOffset;
 import com.alibaba.fluss.lake.committer.CommittedLakeSnapshot;
 import com.alibaba.fluss.lake.committer.LakeCommitter;
 import com.alibaba.fluss.lake.writer.LakeTieringFactory;
@@ -34,7 +36,10 @@ import com.alibaba.fluss.metadata.PartitionInfo;
 import com.alibaba.fluss.metadata.TableBucket;
 import com.alibaba.fluss.metadata.TableInfo;
 import com.alibaba.fluss.metadata.TablePath;
+import 
com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonFactory;
+import 
com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
 import com.alibaba.fluss.utils.ExceptionUtils;
+import com.alibaba.fluss.utils.json.BucketOffsetJsonSerde;
 
 import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
 import org.apache.flink.runtime.source.event.SourceEventWrapper;
@@ -45,6 +50,8 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import javax.annotation.Nullable;
 
+import java.io.IOException;
+import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -53,6 +60,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static 
com.alibaba.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
 import static com.alibaba.fluss.utils.Preconditions.checkState;
 
 /**
@@ -64,8 +72,8 @@ import static 
com.alibaba.fluss.utils.Preconditions.checkState;
  *
  * <p>When it collects all {@link TableBucketWriteResult}s of a round of 
tiering for a table, it
  * will combine all the {@link WriteResult}s to {@link Committable} via method 
{@link
- * LakeCommitter#toCommittable(List)}, and then call method {@link 
LakeCommitter#commit(Object)} to
- * commit to lake.
+ * LakeCommitter#toCommittable(List)}, and then call method {@link 
LakeCommitter#commit(Object,
+ * Map)} to commit to lake.
  *
  * <p>Finally, it will also commit the commited lake snapshot to Fluss cluster 
to make Fluss aware
  * of the tiering progress.
@@ -82,6 +90,7 @@ public class TieringCommitOperator<WriteResult, Committable>
     private final FlussTableLakeSnapshotCommitter 
flussTableLakeSnapshotCommitter;
     private Connection connection;
     private Admin admin;
+    private static final JsonFactory JACKSON_FACTORY = new JsonFactory();
 
     // gateway to send event to flink source coordinator
     private final OperatorEventGateway operatorEventGateway;
@@ -179,11 +188,23 @@ public class TieringCommitOperator<WriteResult, 
Committable>
                     committableWriteResults.stream()
                             .map(TableBucketWriteResult::writeResult)
                             .collect(Collectors.toList());
+
+            LakeSnapshot flussCurrentLakeSnapshot = 
getLatestLakeSnapshot(tablePath);
+            Map<TableBucket, Long> logOffsets =
+                    mergeTableBucketOffsets(flussCurrentLakeSnapshot, 
committableWriteResults);
+
             // to committable
             Committable committable = 
lakeCommitter.toCommittable(writeResults);
             // before commit to lake, check fluss not missing any lake 
snapshot commited by fluss
-            checkFlussNotMissingLakeSnapshot(tablePath, lakeCommitter, 
committable);
-            long commitedSnapshotId = lakeCommitter.commit(committable);
+            checkFlussNotMissingLakeSnapshot(
+                    tablePath,
+                    lakeCommitter,
+                    committable,
+                    flussCurrentLakeSnapshot == null
+                            ? null
+                            : flussCurrentLakeSnapshot.getSnapshotId());
+            long commitedSnapshotId =
+                    lakeCommitter.commit(committable, 
toBucketOffsetsProperty(logOffsets));
             // commit to fluss
             Map<TableBucket, Long> logEndOffsets = new HashMap<>();
             for (TableBucketWriteResult<WriteResult> writeResult : 
committableWriteResults) {
@@ -195,14 +216,53 @@ public class TieringCommitOperator<WriteResult, 
Committable>
         }
     }
 
-    private void checkFlussNotMissingLakeSnapshot(
-            TablePath tablePath,
-            LakeCommitter<WriteResult, Committable> lakeCommitter,
-            Committable committable)
-            throws Exception {
-        Long flussCurrentLakeSnapshot;
+    public static Map<String, String> toBucketOffsetsProperty(
+            Map<TableBucket, Long> tableBucketOffsets) throws IOException {
+        StringWriter sw = new StringWriter();
+        try (JsonGenerator gen = JACKSON_FACTORY.createGenerator(sw)) {
+            gen.writeStartArray();
+            for (Map.Entry<TableBucket, Long> entry : 
tableBucketOffsets.entrySet()) {
+                BucketOffsetJsonSerde.INSTANCE.serialize(
+                        new BucketOffset(
+                                entry.getValue(),
+                                entry.getKey().getBucket(),
+                                entry.getKey().getPartitionId(),
+                                // todo: fill partition name in #1448
+                                null),
+                        gen);
+            }
+            gen.writeEndArray();
+        }
+        return new HashMap<String, String>() {
+            {
+                put(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY, sw.toString());
+            }
+        };
+    }
+
+    /**
+     * Merge the log offsets of latest snapshot with current written bucket 
offsets to get full log
+     * offsets.
+     */
+    private Map<TableBucket, Long> mergeTableBucketOffsets(
+            @Nullable LakeSnapshot latestLakeSnapshot,
+            List<TableBucketWriteResult<WriteResult>> currentWriteResults) {
+        Map<TableBucket, Long> tableBucketOffsets =
+                latestLakeSnapshot == null
+                        ? new HashMap<>()
+                        : new 
HashMap<>(latestLakeSnapshot.getTableBucketsOffset());
+        for (TableBucketWriteResult<WriteResult> tableBucketWriteResult : 
currentWriteResults) {
+            tableBucketOffsets.put(
+                    tableBucketWriteResult.tableBucket(), 
tableBucketWriteResult.logEndOffset());
+        }
+        return tableBucketOffsets;
+    }
+
+    @Nullable
+    private LakeSnapshot getLatestLakeSnapshot(TablePath tablePath) throws 
Exception {
+        LakeSnapshot flussCurrentLakeSnapshot;
         try {
-            flussCurrentLakeSnapshot = 
admin.getLatestLakeSnapshot(tablePath).get().getSnapshotId();
+            flussCurrentLakeSnapshot = 
admin.getLatestLakeSnapshot(tablePath).get();
         } catch (Exception e) {
             Throwable throwable = e.getCause();
             if (throwable instanceof LakeTableSnapshotNotExistException) {
@@ -212,6 +272,15 @@ public class TieringCommitOperator<WriteResult, 
Committable>
                 throw e;
             }
         }
+        return flussCurrentLakeSnapshot;
+    }
+
+    private void checkFlussNotMissingLakeSnapshot(
+            TablePath tablePath,
+            LakeCommitter<WriteResult, Committable> lakeCommitter,
+            Committable committable,
+            Long flussCurrentLakeSnapshot)
+            throws Exception {
 
         // get Fluss missing lake snapshot in Lake
         CommittedLakeSnapshot missingCommittedSnapshot =
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/TestingLakeTieringFactory.java
 
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/TestingLakeTieringFactory.java
index 9e5097bc..f81d018e 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/TestingLakeTieringFactory.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/TestingLakeTieringFactory.java
@@ -33,6 +33,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 /** An implementation of {@link LakeTieringFactory} for testing purpose. */
 public class TestingLakeTieringFactory
@@ -118,7 +119,8 @@ public class TestingLakeTieringFactory
         }
 
         @Override
-        public long commit(TestingCommittable committable) throws IOException {
+        public long commit(TestingCommittable committable, Map<String, String> 
snapshotProperties)
+                throws IOException {
             return ++currentSnapshot;
         }
 
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java
index 0d9d1b0e..26b3580d 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/committer/FlussTableLakeSnapshotCommitterTest.java
@@ -71,14 +71,14 @@ class FlussTableLakeSnapshotCommitterTest extends 
FlinkTestBase {
                                 ? DATA1_PARTITIONED_TABLE_DESCRIPTOR
                                 : DATA1_TABLE_DESCRIPTOR);
 
-        List<String> partitions;
+        List<Long> partitions;
         Map<String, Long> partitionNameAndIds = Collections.emptyMap();
         if (!isPartitioned) {
             FLUSS_CLUSTER_EXTENSION.waitUntilTableReady(tableId);
             partitions = Collections.singletonList(null);
         } else {
             partitionNameAndIds = 
FLUSS_CLUSTER_EXTENSION.waitUntilPartitionAllReady(tablePath);
-            partitions = new ArrayList<>(partitionNameAndIds.keySet());
+            partitions = new ArrayList<>(partitionNameAndIds.values());
         }
 
         CommittedLakeSnapshot committedLakeSnapshot = new 
CommittedLakeSnapshot(3);
@@ -86,15 +86,13 @@ class FlussTableLakeSnapshotCommitterTest extends 
FlinkTestBase {
         Map<TableBucket, Long> expectedOffsets = new HashMap<>();
         for (int bucket = 0; bucket < 3; bucket++) {
             long bucketOffset = bucket * bucket;
-            for (String partition : partitions) {
+            for (Long partition : partitions) {
                 if (partition == null) {
                     committedLakeSnapshot.addBucket(bucket, bucketOffset);
                     expectedOffsets.put(new TableBucket(tableId, bucket), 
bucketOffset);
                 } else {
                     committedLakeSnapshot.addPartitionBucket(partition, 
bucket, bucketOffset);
-                    expectedOffsets.put(
-                            new TableBucket(tableId, 
partitionNameAndIds.get(partition), bucket),
-                            bucketOffset);
+                    expectedOffsets.put(new TableBucket(tableId, partition, 
bucket), bucketOffset);
                 }
             }
         }
diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
 
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
index 647ecfc2..86c43c1f 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/tiering/committer/TieringCommitOperatorTest.java
@@ -327,10 +327,9 @@ class TieringCommitOperatorTest extends FlinkTestBase {
                         mockCommitedSnapshot));
     }
 
-    private CommittedLakeSnapshot mockCommittedLakeSnapshot(
-            List<String> partitions, int snapshotId) {
+    private CommittedLakeSnapshot mockCommittedLakeSnapshot(List<Long> 
partitions, int snapshotId) {
         CommittedLakeSnapshot mockCommittedSnapshot = new 
CommittedLakeSnapshot(snapshotId);
-        for (String partition : partitions) {
+        for (Long partition : partitions) {
             for (int bucket = 0; bucket < DEFAULT_BUCKET_NUM; bucket++) {
                 if (partition == null) {
                     mockCommittedSnapshot.addBucket(bucket, bucket + 1);
@@ -347,18 +346,15 @@ class TieringCommitOperatorTest extends FlinkTestBase {
             CommittedLakeSnapshot committedLakeSnapshot,
             Map<String, Long> partitionIdByName) {
         Map<TableBucket, Long> expectedLogEndOffsets = new HashMap<>();
-        for (Map.Entry<Tuple2<String, Integer>, Long> entry :
+        for (Map.Entry<Tuple2<Long, Integer>, Long> entry :
                 committedLakeSnapshot.getLogEndOffsets().entrySet()) {
-            Tuple2<String, Integer> partitionBucket = entry.getKey();
+            Tuple2<Long, Integer> partitionBucket = entry.getKey();
             if (partitionBucket.f0 == null) {
                 expectedLogEndOffsets.put(
                         new TableBucket(tableId, partitionBucket.f1), 
entry.getValue());
             } else {
                 expectedLogEndOffsets.put(
-                        new TableBucket(
-                                tableId,
-                                partitionIdByName.get(partitionBucket.f0),
-                                partitionBucket.f1),
+                        new TableBucket(tableId, partitionBucket.f0, 
partitionBucket.f1),
                         entry.getValue());
             }
         }
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
index c5927100..9d7eed0b 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/tiering/PaimonLakeCommitter.java
@@ -17,37 +17,35 @@
 
 package com.alibaba.fluss.lake.paimon.tiering;
 
+import com.alibaba.fluss.lake.committer.BucketOffset;
 import com.alibaba.fluss.lake.committer.CommittedLakeSnapshot;
 import com.alibaba.fluss.lake.committer.LakeCommitter;
 import com.alibaba.fluss.metadata.TablePath;
+import 
com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import 
com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import com.alibaba.fluss.utils.json.BucketOffsetJsonSerde;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.operation.FileStoreCommit;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.CommitCallback;
-import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.utils.SnapshotManager;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
+import static 
com.alibaba.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
 import static 
com.alibaba.fluss.lake.paimon.tiering.PaimonLakeTieringFactory.FLUSS_LAKE_TIERING_COMMIT_USER;
 import static com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
-import static 
com.alibaba.fluss.metadata.ResolvedPartitionSpec.PARTITION_SPEC_SEPARATOR;
-import static com.alibaba.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
-import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
 import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
 import static org.apache.paimon.table.sink.BatchWriteBuilder.COMMIT_IDENTIFIER;
 
@@ -59,6 +57,7 @@ public class PaimonLakeCommitter implements 
LakeCommitter<PaimonWriteResult, Pai
     private FileStoreCommit fileStoreCommit;
     private final TablePath tablePath;
     private static final ThreadLocal<Long> currentCommitSnapshotId = new 
ThreadLocal<>();
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
     public PaimonLakeCommitter(PaimonCatalogProvider paimonCatalogProvider, 
TablePath tablePath)
             throws IOException {
@@ -78,8 +77,11 @@ public class PaimonLakeCommitter implements 
LakeCommitter<PaimonWriteResult, Pai
     }
 
     @Override
-    public long commit(PaimonCommittable committable) throws IOException {
+    public long commit(PaimonCommittable committable, Map<String, String> 
snapshotProperties)
+            throws IOException {
         ManifestCommittable manifestCommittable = 
committable.manifestCommittable();
+        snapshotProperties.forEach(manifestCommittable::addProperty);
+
         try {
             fileStoreCommit =
                     fileStoreTable
@@ -110,9 +112,9 @@ public class PaimonLakeCommitter implements 
LakeCommitter<PaimonWriteResult, Pai
     @Override
     public CommittedLakeSnapshot getMissingLakeSnapshot(@Nullable Long 
latestLakeSnapshotIdOfFluss)
             throws IOException {
-        Long latestLakeSnapshotIdOfLake =
-                
getCommittedLatestSnapshotIdOfLake(FLUSS_LAKE_TIERING_COMMIT_USER);
-        if (latestLakeSnapshotIdOfLake == null) {
+        Snapshot latestLakeSnapshotOfLake =
+                
getCommittedLatestSnapshotOfLake(FLUSS_LAKE_TIERING_COMMIT_USER);
+        if (latestLakeSnapshotOfLake == null) {
             return null;
         }
 
@@ -120,40 +122,51 @@ public class PaimonLakeCommitter implements 
LakeCommitter<PaimonWriteResult, Pai
         // but the latest snapshot is not greater than 
latestLakeSnapshotIdOfFluss, no any missing
         // snapshot, return directly
         if (latestLakeSnapshotIdOfFluss != null
-                && latestLakeSnapshotIdOfLake <= latestLakeSnapshotIdOfFluss) {
+                && latestLakeSnapshotOfLake.id() <= 
latestLakeSnapshotIdOfFluss) {
             return null;
         }
 
-        // todo: the temporary way to scan the delta to get the log end offset,
-        // we should read from snapshot's properties in Paimon 1.2
         CommittedLakeSnapshot committedLakeSnapshot =
-                new CommittedLakeSnapshot(latestLakeSnapshotIdOfLake);
-        ScanMode scanMode =
-                fileStoreTable.primaryKeys().isEmpty() ? ScanMode.DELTA : 
ScanMode.CHANGELOG;
-
-        Iterator<ManifestEntry> manifestEntryIterator =
-                fileStoreTable
-                        .store()
-                        .newScan()
-                        .withSnapshot(latestLakeSnapshotIdOfLake)
-                        .withKind(scanMode)
-                        .readFileIterator();
+                new CommittedLakeSnapshot(latestLakeSnapshotOfLake.id());
 
-        int bucketIdColumnIndex = getColumnIndex(BUCKET_COLUMN_NAME);
-        int logOffsetColumnIndex = getColumnIndex(OFFSET_COLUMN_NAME);
-        while (manifestEntryIterator.hasNext()) {
-            updateCommittedLakeSnapshot(
-                    committedLakeSnapshot,
-                    manifestEntryIterator.next(),
-                    bucketIdColumnIndex,
-                    logOffsetColumnIndex);
+        if (latestLakeSnapshotOfLake.properties() == null) {
+            throw new IOException("Failed to load committed lake snapshot 
properties from Paimon.");
         }
 
+        // if resume from an old tiering service v0.7 without paimon 
supporting snapshot properties,
+        // we can't get the properties. But once come into here, it must be 
that
+        // tiering service commit snapshot to lake, but fail to commit to 
fluss, we have to notify
+        // users to run old tiering service again to commit the snapshot to 
fluss again, and then
+        // it can resume tiering with new tiering service
+        Map<String, String> lakeSnapshotProperties = 
latestLakeSnapshotOfLake.properties();
+        if (lakeSnapshotProperties == null) {
+            throw new IllegalArgumentException(
+                    "Cannot resume tiering from an old version(v0.7) of 
tiering service. "
+                            + "The snapshot was committed to the lake storage 
but failed to commit to Fluss. "
+                            + "To resolve this:\n"
+                            + "1. Run the old tiering service(v0.7) again to 
complete the Fluss commit\n"
+                            + "2. Then you can resume tiering with the newer 
version of tiering service");
+        } else {
+            String flussOffsetProperties =
+                    
lakeSnapshotProperties.get(FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY);
+            for (JsonNode node : 
OBJECT_MAPPER.readTree(flussOffsetProperties)) {
+                BucketOffset bucketOffset = 
BucketOffsetJsonSerde.INSTANCE.deserialize(node);
+                if (bucketOffset.getPartitionId() != null) {
+                    committedLakeSnapshot.addPartitionBucket(
+                            bucketOffset.getPartitionId(),
+                            bucketOffset.getBucket(),
+                            bucketOffset.getLogOffset());
+                } else {
+                    committedLakeSnapshot.addBucket(
+                            bucketOffset.getBucket(), 
bucketOffset.getLogOffset());
+                }
+            }
+        }
         return committedLakeSnapshot;
     }
 
     @Nullable
-    private Long getCommittedLatestSnapshotIdOfLake(String commitUser) throws 
IOException {
+    private Snapshot getCommittedLatestSnapshotOfLake(String commitUser) 
throws IOException {
         // get the latest snapshot commited by fluss or latest commited id
         SnapshotManager snapshotManager = fileStoreTable.snapshotManager();
         Long userCommittedSnapshotIdOrLatestCommitId =
@@ -172,7 +185,7 @@ public class PaimonLakeCommitter implements 
LakeCommitter<PaimonWriteResult, Pai
             // the snapshot is still not commited by Fluss, return directly
             return null;
         }
-        return snapshot.id();
+        return snapshot;
     }
 
     @Override
@@ -226,44 +239,4 @@ public class PaimonLakeCommitter implements 
LakeCommitter<PaimonWriteResult, Pai
             // do-nothing
         }
     }
-
-    private void updateCommittedLakeSnapshot(
-            CommittedLakeSnapshot committedLakeSnapshot,
-            ManifestEntry manifestEntry,
-            int bucketIdColumnIndex,
-            int logOffsetColumnIndex) {
-        // always get bucket, log_offset from statistic
-        DataFileMeta dataFileMeta = manifestEntry.file();
-        BinaryRow maxStatisticRow = dataFileMeta.valueStats().maxValues();
-
-        int bucketId = maxStatisticRow.getInt(bucketIdColumnIndex);
-        long offset = maxStatisticRow.getLong(logOffsetColumnIndex);
-
-        String partition = null;
-        BinaryRow partitionRow = manifestEntry.partition();
-        if (partitionRow.getFieldCount() > 0) {
-            List<String> partitionFields = new 
ArrayList<>(partitionRow.getFieldCount());
-            for (int i = 0; i < partitionRow.getFieldCount(); i++) {
-                partitionFields.add(partitionRow.getString(i).toString());
-            }
-            partition = String.join(PARTITION_SPEC_SEPARATOR, partitionFields);
-        }
-
-        if (partition == null) {
-            committedLakeSnapshot.addBucket(bucketId, offset);
-        } else {
-            committedLakeSnapshot.addPartitionBucket(partition, bucketId, 
offset);
-        }
-    }
-
-    private int getColumnIndex(String columnName) {
-        int columnIndex = 
fileStoreTable.schema().fieldNames().indexOf(columnName);
-        if (columnIndex < 0) {
-            throw new IllegalArgumentException(
-                    String.format(
-                            "Column '%s' is not found in paimon table %s, the 
columns of the table are %s",
-                            columnIndex, tablePath, 
fileStoreTable.schema().fieldNames()));
-        }
-        return columnIndex;
-    }
 }
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
index b4788d87..d9c0bebd 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
@@ -43,6 +43,7 @@ import com.alibaba.fluss.types.DataTypes;
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.paimon.Snapshot;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogFactory;
@@ -460,4 +461,18 @@ public class FlinkPaimonTieringTestBase {
                 
table.newRead().createReader(table.newReadBuilder().newScan().plan());
         return reader.toCloseableIterator();
     }
+
+    protected void checkSnapshotPropertyInPaimon(
+            TablePath tablePath, Map<String, String> expectedProperties) 
throws Exception {
+        FileStoreTable table =
+                (FileStoreTable)
+                        getPaimonCatalog()
+                                .getTable(
+                                        Identifier.create(
+                                                tablePath.getDatabaseName(),
+                                                tablePath.getTableName()));
+        Snapshot snapshot = table.snapshotManager().latestSnapshot();
+        assertThat(snapshot).isNotNull();
+        assertThat(snapshot.properties()).isEqualTo(expectedProperties);
+    }
 }
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringITCase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringITCase.java
index 2ee3083f..576d4722 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringITCase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringITCase.java
@@ -42,10 +42,12 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import static 
com.alibaba.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
 import static com.alibaba.fluss.testutils.DataTestUtils.row;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -84,6 +86,16 @@ class PaimonTieringITCase extends FlinkPaimonTieringTestBase 
{
         assertReplicaStatus(t1Bucket, 3);
         // check data in paimon
         checkDataInPaimonPrimayKeyTable(t1, rows);
+        // check snapshot property in paimon
+        Map<String, String> properties =
+                new HashMap<String, String>() {
+                    {
+                        put(
+                                FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
+                                "[{\"bucket_id\":0,\"log_offset\":3}]");
+                    }
+                };
+        checkSnapshotPropertyInPaimon(t1, properties);
 
         // then, create another log table
         TablePath t2 = TablePath.of(DEFAULT_DB, "logTable");
@@ -146,6 +158,20 @@ class PaimonTieringITCase extends 
FlinkPaimonTieringTestBase {
                     writtenRowsByPartition.get(partitionName),
                     0);
         }
+
+        properties =
+                new HashMap<String, String>() {
+                    {
+                        put(
+                                FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
+                                "["
+                                        + 
"{\"partition_id\":0,\"bucket_id\":0,\"log_offset\":3},"
+                                        + 
"{\"partition_id\":1,\"bucket_id\":0,\"log_offset\":3}"
+                                        + "]");
+                    }
+                };
+        checkSnapshotPropertyInPaimon(partitionedTablePath, properties);
+
         jobClient.cancel().get();
     }
 
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
index ae02112f..16984f7f 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/tiering/PaimonTieringTest.java
@@ -66,6 +66,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Stream;
 
+import static 
com.alibaba.fluss.flink.tiering.committer.TieringCommitOperator.toBucketOffsetsProperty;
 import static com.alibaba.fluss.lake.paimon.utils.PaimonConversions.toPaimon;
 import static com.alibaba.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
 import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
@@ -129,13 +130,23 @@ class PaimonTieringTest {
         }
 
         Map<Tuple2<String, Integer>, List<LogRecord>> recordsByBucket = new 
HashMap<>();
-        List<String> partitions =
-                isPartitioned ? Arrays.asList("p1", "p2", "p3") : 
Collections.singletonList(null);
+        Map<Long, String> partitionIdAndName =
+                isPartitioned
+                        ? new HashMap<Long, String>() {
+                            {
+                                put(1L, "p1");
+                                put(2L, "p2");
+                                put(3L, "p3");
+                            }
+                        }
+                        : Collections.singletonMap(null, null);
+        Map<TableBucket, Long> tableBucketOffsets = new HashMap<>();
         // first, write data
         for (int bucket = 0; bucket < bucketNum; bucket++) {
-            for (String partition : partitions) {
+            for (Map.Entry<Long, String> entry : 
partitionIdAndName.entrySet()) {
+                String partition = entry.getValue();
                 try (LakeWriter<PaimonWriteResult> lakeWriter =
-                        createLakeWriter(tablePath, bucket, partition)) {
+                        createLakeWriter(tablePath, bucket, partition, 
entry.getKey())) {
                     Tuple2<String, Integer> partitionBucket = 
Tuple2.of(partition, bucket);
                     Tuple2<List<LogRecord>, List<LogRecord>> 
writeAndExpectRecords =
                             isPrimaryKeyTable
@@ -144,6 +155,7 @@ class PaimonTieringTest {
                     List<LogRecord> writtenRecords = writeAndExpectRecords.f0;
                     List<LogRecord> expectRecords = writeAndExpectRecords.f1;
                     recordsByBucket.put(partitionBucket, expectRecords);
+                    tableBucketOffsets.put(new TableBucket(0, entry.getKey(), 
bucket), 10L);
                     for (LogRecord logRecord : writtenRecords) {
                         lakeWriter.write(logRecord);
                     }
@@ -166,13 +178,15 @@ class PaimonTieringTest {
             paimonCommittable =
                     committableSerializer.deserialize(
                             committableSerializer.getVersion(), serialized);
-            long snapshot = lakeCommitter.commit(paimonCommittable);
+            long snapshot =
+                    lakeCommitter.commit(
+                            paimonCommittable, 
toBucketOffsetsProperty(tableBucketOffsets));
             assertThat(snapshot).isEqualTo(1);
         }
 
         // then, check data
         for (int bucket = 0; bucket < 3; bucket++) {
-            for (String partition : partitions) {
+            for (String partition : partitionIdAndName.values()) {
                 Tuple2<String, Integer> partitionBucket = Tuple2.of(partition, 
bucket);
                 List<LogRecord> expectRecords = 
recordsByBucket.get(partitionBucket);
                 CloseableIterator<InternalRow> actualRecords =
@@ -192,11 +206,11 @@ class PaimonTieringTest {
             // use snapshot id 0 as the known snapshot id
             CommittedLakeSnapshot committedLakeSnapshot = 
lakeCommitter.getMissingLakeSnapshot(0L);
             assertThat(committedLakeSnapshot).isNotNull();
-            Map<Tuple2<String, Integer>, Long> offsets = 
committedLakeSnapshot.getLogEndOffsets();
+            Map<Tuple2<Long, Integer>, Long> offsets = 
committedLakeSnapshot.getLogEndOffsets();
             for (int bucket = 0; bucket < 3; bucket++) {
-                for (String partition : partitions) {
-                    // we only write 10 records, so expected log offset should 
be 9
-                    assertThat(offsets.get(Tuple2.of(partition, 
bucket))).isEqualTo(9);
+                for (Long partitionId : partitionIdAndName.keySet()) {
+                    // we only write 10 records, so expected log offset should 
be 10
+                    assertThat(offsets.get(Tuple2.of(partitionId, 
bucket))).isEqualTo(10);
                 }
             }
             assertThat(committedLakeSnapshot.getLakeSnapshotId()).isOne();
@@ -221,15 +235,24 @@ class PaimonTieringTest {
 
         Map<String, List<LogRecord>> recordsByPartition = new HashMap<>();
         List<PaimonWriteResult> paimonWriteResults = new ArrayList<>();
+        Map<TableBucket, Long> tableBucketOffsets = new HashMap<>();
 
         // Test data for different partitions using $ separator
-        List<String> partitions = Arrays.asList("us-east$2024", 
"us-west$2024", "eu-central$2023");
+        Map<Long, String> partitionIdAndName =
+                new HashMap<Long, String>() {
+                    {
+                        put(1L, "us-east$2024");
+                        put(2L, "us-west$2024");
+                        put(3L, "eu-central$2023");
+                    }
+                };
 
         int bucket = 0;
 
-        for (String partition : partitions) {
+        for (Map.Entry<Long, String> entry : partitionIdAndName.entrySet()) {
+            String partition = entry.getValue();
             try (LakeWriter<PaimonWriteResult> lakeWriter =
-                    createLakeWriter(tablePath, bucket, partition)) {
+                    createLakeWriter(tablePath, bucket, partition, 
entry.getKey())) {
                 List<LogRecord> logRecords =
                         genLogTableRecordsForMultiPartition(partition, bucket, 
3);
                 recordsByPartition.put(partition, logRecords);
@@ -237,7 +260,7 @@ class PaimonTieringTest {
                 for (LogRecord logRecord : logRecords) {
                     lakeWriter.write(logRecord);
                 }
-
+                tableBucketOffsets.put(new TableBucket(0, entry.getKey(), 
bucket), 3L);
                 PaimonWriteResult result = lakeWriter.complete();
                 paimonWriteResults.add(result);
             }
@@ -247,12 +270,13 @@ class PaimonTieringTest {
         try (LakeCommitter<PaimonWriteResult, PaimonCommittable> lakeCommitter 
=
                 createLakeCommitter(tablePath)) {
             PaimonCommittable committable = 
lakeCommitter.toCommittable(paimonWriteResults);
-            long snapshot = lakeCommitter.commit(committable);
+            long snapshot =
+                    lakeCommitter.commit(committable, 
toBucketOffsetsProperty(tableBucketOffsets));
             assertThat(snapshot).isEqualTo(1);
         }
 
         // Verify data for each partition
-        for (String partition : partitions) {
+        for (String partition : partitionIdAndName.values()) {
             List<LogRecord> expectRecords = recordsByPartition.get(partition);
             CloseableIterator<InternalRow> actualRecords =
                     getPaimonRowsMultiPartition(tablePath, partition);
@@ -268,16 +292,22 @@ class PaimonTieringTest {
 
         Map<String, List<LogRecord>> recordsByPartition = new HashMap<>();
         List<PaimonWriteResult> paimonWriteResults = new ArrayList<>();
+        Map<TableBucket, Long> tableBucketOffsets = new HashMap<>();
 
         // Test data for different three-level partitions using $ separator
-        List<String> partitions =
-                Arrays.asList("us-east$2024$01", "us-east$2024$02", 
"eu-central$2023$12");
-
+        Map<Long, String> partitionIdAndName =
+                new HashMap<Long, String>() {
+                    {
+                        put(1L, "us-east$2024$01");
+                        put(2L, "eu-central$2023$12");
+                    }
+                };
         int bucket = 0;
 
-        for (String partition : partitions) {
+        for (Map.Entry<Long, String> entry : partitionIdAndName.entrySet()) {
+            String partition = entry.getValue();
             try (LakeWriter<PaimonWriteResult> lakeWriter =
-                    createLakeWriter(tablePath, bucket, partition)) {
+                    createLakeWriter(tablePath, bucket, partition, 
entry.getKey())) {
                 List<LogRecord> logRecords =
                         genLogTableRecordsForMultiPartition(
                                 partition, bucket, 2); // Use same method
@@ -286,6 +316,7 @@ class PaimonTieringTest {
                 for (LogRecord logRecord : logRecords) {
                     lakeWriter.write(logRecord);
                 }
+                tableBucketOffsets.put(new TableBucket(0, entry.getKey(), 
bucket), 2L);
 
                 PaimonWriteResult result = lakeWriter.complete();
                 paimonWriteResults.add(result);
@@ -296,12 +327,13 @@ class PaimonTieringTest {
         try (LakeCommitter<PaimonWriteResult, PaimonCommittable> lakeCommitter 
=
                 createLakeCommitter(tablePath)) {
             PaimonCommittable committable = 
lakeCommitter.toCommittable(paimonWriteResults);
-            long snapshot = lakeCommitter.commit(committable);
+            long snapshot =
+                    lakeCommitter.commit(committable, 
toBucketOffsetsProperty(tableBucketOffsets));
             assertThat(snapshot).isEqualTo(1);
         }
 
         // Verify data for each partition
-        for (String partition : partitions) {
+        for (String partition : partitionIdAndName.values()) {
             List<LogRecord> expectRecords = recordsByPartition.get(partition);
             CloseableIterator<InternalRow> actualRecords =
                     getPaimonRowsThreePartition(tablePath, partition);
@@ -639,7 +671,8 @@ class PaimonTieringTest {
     }
 
     private LakeWriter<PaimonWriteResult> createLakeWriter(
-            TablePath tablePath, int bucket, @Nullable String partition) 
throws IOException {
+            TablePath tablePath, int bucket, @Nullable String partition, 
@Nullable Long partitionId)
+            throws IOException {
         return paimonLakeTieringFactory.createLakeWriter(
                 new WriterInitContext() {
                     @Override
@@ -650,7 +683,7 @@ class PaimonTieringTest {
                     @Override
                     public TableBucket tableBucket() {
                         // don't care about tableId & partitionId
-                        return new TableBucket(0, 0L, bucket);
+                        return new TableBucket(0, partitionId, bucket);
                     }
 
                     @Nullable

Reply via email to