This is an automated email from the ASF dual-hosted git repository.

mehulbatra pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new d37817984 [hotfix] Fix unstable test IcebergTieringITCase (#1584)
d37817984 is described below

commit d37817984fa89bb7a446fcc2cb809bdffb283f56
Author: yuxia Luo <[email protected]>
AuthorDate: Tue Aug 26 14:04:45 2025 +0800

    [hotfix] Fix unstable test IcebergTieringITCase (#1584)
    
    * [hotfix] Fix unstable test IcebergTieringITCase
    
    * address comments
---
 .../lake/iceberg/tiering/IcebergLakeCommitter.java | 37 ++++++++++++++--------
 1 file changed, 23 insertions(+), 14 deletions(-)

diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
index 2df44d3de..b87081ad5 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
@@ -44,6 +44,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -55,6 +56,7 @@ import static 
org.apache.fluss.utils.Preconditions.checkNotNull;
 
 /** Implementation of {@link LakeCommitter} for Iceberg. */
 public class IcebergLakeCommitter implements LakeCommitter<IcebergWriteResult, 
IcebergCommittable> {
+    private static final String COMMITTER_USER = "commit-user";
 
     private final Catalog icebergCatalog;
     private final Table icebergTable;
@@ -139,7 +141,7 @@ public class IcebergLakeCommitter implements 
LakeCommitter<IcebergWriteResult, I
 
     private void addFlussProperties(
             SnapshotUpdate<?> operation, Map<String, String> 
snapshotProperties) {
-        operation.set("commit-user", FLUSS_LAKE_TIERING_COMMIT_USER);
+        operation.set(COMMITTER_USER, FLUSS_LAKE_TIERING_COMMIT_USER);
         for (Map.Entry<String, String> entry : snapshotProperties.entrySet()) {
             operation.set(entry.getKey(), entry.getValue());
         }
@@ -173,9 +175,19 @@ public class IcebergLakeCommitter implements 
LakeCommitter<IcebergWriteResult, I
         }
 
         // Check if there's a gap between Fluss and Iceberg snapshots
-        if (latestLakeSnapshotIdOfFluss != null
-                && latestLakeSnapshot.snapshotId() <= 
latestLakeSnapshotIdOfFluss) {
-            return null;
+        if (latestLakeSnapshotIdOfFluss != null) {
+            Snapshot latestLakeSnapshotOfFluss = 
icebergTable.snapshot(latestLakeSnapshotIdOfFluss);
+            if (latestLakeSnapshotOfFluss == null) {
+                throw new IllegalStateException(
+                        "Referenced Fluss snapshot "
+                                + latestLakeSnapshotIdOfFluss
+                                + " not found in Iceberg table");
+            }
+            // note: we need to use sequence number to compare,
+            // we can't use snapshot id as the snapshot id is not ordered
+            if (latestLakeSnapshot.sequenceNumber() <= 
latestLakeSnapshotOfFluss.sequenceNumber()) {
+                return null;
+            }
         }
 
         CommittedLakeSnapshot committedLakeSnapshot =
@@ -237,20 +249,17 @@ public class IcebergLakeCommitter implements 
LakeCommitter<IcebergWriteResult, I
         icebergTable.refresh();
 
         // Find the latest snapshot committed by Fluss
-        Iterable<Snapshot> snapshots = icebergTable.snapshots();
-        Snapshot latestFlussSnapshot = null;
-
+        List<Snapshot> snapshots = (List<Snapshot>) icebergTable.snapshots();
+        // snapshots() returns snapshots in chronological order (oldest to 
newest), Reverse to find
+        // most recent snapshot committed by Fluss
+        Collections.reverse(snapshots);
         for (Snapshot snapshot : snapshots) {
             Map<String, String> summary = snapshot.summary();
-            if (summary != null && 
commitUser.equals(summary.get("commit-user"))) {
-                if (latestFlussSnapshot == null
-                        || snapshot.snapshotId() > 
latestFlussSnapshot.snapshotId()) {
-                    latestFlussSnapshot = snapshot;
-                }
+            if (summary != null && 
commitUser.equals(summary.get(COMMITTER_USER))) {
+                return snapshot;
             }
         }
-
-        return latestFlussSnapshot;
+        return null;
     }
 
     /** A {@link Listener} to listen the iceberg create snapshot event. */

Reply via email to