luoyuxia commented on code in PR #2326: URL: https://github.com/apache/fluss/pull/2326#discussion_r2785645612
########## fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/DvTableReadableSnapshotRetriever.java: ########## @@ -0,0 +1,620 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.paimon.utils; + +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.client.admin.Admin; +import org.apache.fluss.client.metadata.LakeSnapshot; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.lake.committer.LakeCommitResult; +import org.apache.fluss.metadata.PartitionInfo; +import org.apache.fluss.metadata.ResolvedPartitionSpec; +import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.utils.types.Tuple2; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.manifest.FileKind; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.operation.FileStoreScan; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.source.ScanMode; +import org.apache.paimon.utils.SnapshotManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.fluss.lake.paimon.utils.PaimonDvTableUtils.findLatestSnapshotExactlyHoldingL0Files; +import static org.apache.fluss.utils.Preconditions.checkNotNull; +import static org.apache.fluss.utils.Preconditions.checkState; + +/** + * A retriever to retrieve the readable snapshot and offsets for Paimon deletion vector enabled + * table. + */ +public class DvTableReadableSnapshotRetriever implements AutoCloseable { + + private static final Logger LOG = + LoggerFactory.getLogger(DvTableReadableSnapshotRetriever.class); + + private final TablePath tablePath; + private final long tableId; + private final FileStoreTable fileStoreTable; + private final Admin flussAdmin; + private final Connection flussConnection; + private final SnapshotManager snapshotManager; + + public DvTableReadableSnapshotRetriever( + TablePath tablePath, + long tableId, + FileStoreTable paimonFileStoreTable, + Configuration flussConfig) { + this.tablePath = tablePath; + this.tableId = tableId; + this.fileStoreTable = paimonFileStoreTable; + this.flussConnection = ConnectionFactory.createConnection(flussConfig); + this.flussAdmin = flussConnection.getAdmin(); + this.snapshotManager = fileStoreTable.snapshotManager(); + } + + /** + * Get readable offsets for DV tables based on the latest compacted snapshot. + * + * <p>For Paimon DV tables, when an appended snapshot is committed, we need to check the latest + * compacted snapshot to determine readable offsets for each bucket. This method implements + * incremental advancement of readable_snapshot per bucket: + * + * <ul> + * <li>For buckets without L0 files: use offsets from the latest tiered snapshot. These + * buckets can advance their readable offsets since all their data is in base files (L1+). + * <li>For buckets with L0 files: traverse backwards through compacted snapshots to find the + * latest one that flushed this bucket's L0 files. Then find the latest snapshot that + * exactly holds those flushed L0 files, and use the previous APPEND snapshot's offset for + * that bucket. + * </ul> + * + * <p>Algorithm: + * + * <ol> + * <li>Find the latest compacted snapshot before the given tiered snapshot + * <li>Check which buckets have no L0 files and which have L0 files in the compacted snapshot + * <li>For buckets without L0 files: use offsets from the latest tiered snapshot (all data is + * in base files, safe to advance) + * <li>For buckets with L0 files: + * <ol> + * <li>Traverse backwards through compacted snapshots starting from the latest one + * <li>For each compacted snapshot, check which buckets had their L0 files flushed + * <li>For each flushed bucket, find the latest snapshot that exactly holds those L0 + * files using {@link PaimonDvTableUtils#findLatestSnapshotExactlyHoldingL0Files} + * <li>Find the previous APPEND snapshot before that snapshot + * <li>Use that APPEND snapshot's offset for the bucket + * </ol> + * <li>Return readable offsets for all buckets, allowing incremental advancement + * </ol> + * + * <p>Note: This allows readable_snapshot to advance incrementally per bucket. Each bucket's + * readable offset is set to the maximum offset that is actually readable in the compacted + * snapshot, ensuring no data duplication or loss. The readable_snapshot is set to the latest + * compacted snapshot ID, and each bucket continues reading from its respective readable offset. + * + * <p>Example: If bucket0's L0 files were flushed in snapshot5 (which compacted snapshot1's L0 + * files), and snapshot4 is the latest snapshot that exactly holds those L0 files, then + * bucket0's readable offset will be set to snapshot4's previous APPEND snapshot's offset. + * + * @param tieredSnapshotId the tiered snapshot ID (the appended snapshot that was just + * committed) + * @return a tuple containing the readable snapshot ID (the latest compacted snapshot) and a map + * of TableBucket to readable offset for all buckets, or null if: + * <ul> + * <li>No compacted snapshot exists before the tiered snapshot + * <li>Cannot find the latest snapshot holding flushed L0 files for some buckets + * <li>Cannot find the previous APPEND snapshot for some buckets + * <li>Cannot find offsets in Fluss for some buckets + * </ul> + * The map contains offsets for ALL buckets, allowing incremental advancement. + * @throws IOException if an error occurs reading snapshots or offsets from Fluss + */ + @Nullable + public ReadableSnapshotResult getReadableSnapshotAndOffsets(long tieredSnapshotId) + throws IOException { + // Find the latest compacted snapshot + Snapshot latestCompactedSnapshot = + findPreviousSnapshot(tieredSnapshotId, Snapshot.CommitKind.COMPACT); + if (latestCompactedSnapshot == null) { + // No compacted snapshot found, may happen when no compaction happens or snapshot + // expiration, we can't update readable offsets, return null directly + LOG.info( + "Can't find latest compacted snapshot before snapshot {}, skip get readable snapshot.", + tieredSnapshotId); + return null; + } + + Map<TableBucket, Long> readableOffsets = new HashMap<>(); + + FlussTableBucketMapper flussTableBucketMapper = new FlussTableBucketMapper(); + + // get all the bucket without l0 files and with l0 files + Tuple2<Set<PartitionBucket>, Set<PartitionBucket>> bucketsWithoutL0AndWithL0 = + getBucketsWithAndWithoutL0AndWithL0(latestCompactedSnapshot); + Set<PartitionBucket> bucketsWithoutL0 = bucketsWithoutL0AndWithL0.f0; + Set<PartitionBucket> bucketsWithL0 = bucketsWithoutL0AndWithL0.f1; + + // Track the earliest previousAppendSnapshot ID that was accessed + // This represents the oldest snapshot that might still be needed + long earliestSnapshotIdToKeep = LakeCommitResult.KEEP_ALL_PREVIOUS; + + if (!bucketsWithoutL0.isEmpty()) { + // Get latest tiered offsets + LakeSnapshot latestTieredSnapshot; + try { + latestTieredSnapshot = flussAdmin.getLatestLakeSnapshot(tablePath).get(); Review Comment: Good suggestion. I think we can do the check it to fast break the time-cosuming loop. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
