This is an automated email from the ASF dual-hosted git repository.
mehulbatra pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new d37817984 [hotfix] Fix unstable test IcebergTieringITCase (#1584)
d37817984 is described below
commit d37817984fa89bb7a446fcc2cb809bdffb283f56
Author: yuxia Luo <[email protected]>
AuthorDate: Tue Aug 26 14:04:45 2025 +0800
[hotfix] Fix unstable test IcebergTieringITCase (#1584)
* [hotfix] Fix unstable test IcebergTieringITCase
* address comments
---
.../lake/iceberg/tiering/IcebergLakeCommitter.java | 37 ++++++++++++++--------
1 file changed, 23 insertions(+), 14 deletions(-)
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
index 2df44d3de..b87081ad5 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java
@@ -44,6 +44,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -55,6 +56,7 @@ import static
org.apache.fluss.utils.Preconditions.checkNotNull;
/** Implementation of {@link LakeCommitter} for Iceberg. */
public class IcebergLakeCommitter implements LakeCommitter<IcebergWriteResult,
IcebergCommittable> {
+ private static final String COMMITTER_USER = "commit-user";
private final Catalog icebergCatalog;
private final Table icebergTable;
@@ -139,7 +141,7 @@ public class IcebergLakeCommitter implements
LakeCommitter<IcebergWriteResult, I
private void addFlussProperties(
SnapshotUpdate<?> operation, Map<String, String>
snapshotProperties) {
- operation.set("commit-user", FLUSS_LAKE_TIERING_COMMIT_USER);
+ operation.set(COMMITTER_USER, FLUSS_LAKE_TIERING_COMMIT_USER);
for (Map.Entry<String, String> entry : snapshotProperties.entrySet()) {
operation.set(entry.getKey(), entry.getValue());
}
@@ -173,9 +175,19 @@ public class IcebergLakeCommitter implements
LakeCommitter<IcebergWriteResult, I
}
// Check if there's a gap between Fluss and Iceberg snapshots
- if (latestLakeSnapshotIdOfFluss != null
- && latestLakeSnapshot.snapshotId() <=
latestLakeSnapshotIdOfFluss) {
- return null;
+ if (latestLakeSnapshotIdOfFluss != null) {
+ Snapshot latestLakeSnapshotOfFluss =
icebergTable.snapshot(latestLakeSnapshotIdOfFluss);
+ if (latestLakeSnapshotOfFluss == null) {
+ throw new IllegalStateException(
+ "Referenced Fluss snapshot "
+ + latestLakeSnapshotIdOfFluss
+ + " not found in Iceberg table");
+ }
+ // note: we need to use sequence number to compare,
+ // we can't use snapshot id as the snapshot id is not ordered
+ if (latestLakeSnapshot.sequenceNumber() <=
latestLakeSnapshotOfFluss.sequenceNumber()) {
+ return null;
+ }
}
CommittedLakeSnapshot committedLakeSnapshot =
@@ -237,20 +249,17 @@ public class IcebergLakeCommitter implements
LakeCommitter<IcebergWriteResult, I
icebergTable.refresh();
// Find the latest snapshot committed by Fluss
- Iterable<Snapshot> snapshots = icebergTable.snapshots();
- Snapshot latestFlussSnapshot = null;
-
+ List<Snapshot> snapshots = (List<Snapshot>) icebergTable.snapshots();
+ // snapshots() returns snapshots in chronological order (oldest to
newest), Reverse to find
+ // most recent snapshot committed by Fluss
+ Collections.reverse(snapshots);
for (Snapshot snapshot : snapshots) {
Map<String, String> summary = snapshot.summary();
- if (summary != null &&
commitUser.equals(summary.get("commit-user"))) {
- if (latestFlussSnapshot == null
- || snapshot.snapshotId() >
latestFlussSnapshot.snapshotId()) {
- latestFlussSnapshot = snapshot;
- }
+ if (summary != null &&
commitUser.equals(summary.get(COMMITTER_USER))) {
+ return snapshot;
}
}
-
- return latestFlussSnapshot;
+ return null;
}
/** A {@link Listener} to listen the iceberg create snapshot event. */