This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a33dc6804 [core] Optimize snapshot expire loop for consumers (#2546)
a33dc6804 is described below

commit a33dc68045ffe1e8b351d35ba440b5dfe9b3adb4
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Dec 21 19:01:46 2023 +0800

    [core] Optimize snapshot expire loop for consumers (#2546)
    
    This closes #2546.
---
 .../paimon/operation/FileStoreExpireImpl.java      | 39 ++++++++++++----------
 .../paimon/operation/FileStoreExpireTestBase.java  |  2 +-
 2 files changed, 22 insertions(+), 19 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
index ab6bb5d09..c326eb35e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
@@ -31,7 +31,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
-import java.util.OptionalLong;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.function.Predicate;
@@ -112,33 +111,39 @@ public class FileStoreExpireImpl implements 
FileStoreExpire {
             return;
         }
 
-        // locate the first snapshot between the numRetainedMax th and 
(numRetainedMin+1) th latest
-        // snapshots to be retained. This snapshot needs to be preserved 
because it
-        // doesn't fulfill the time threshold condition for expiration.
-        for (long id = Math.max(latestSnapshotId - numRetainedMax + 1, 
earliest);
-                id <= latestSnapshotId - numRetainedMin;
-                id++) {
+        // the min snapshot to retain from 'snapshot.num-retained.max'
+        // (the maximum number of snapshots to retain)
+        long min = Math.max(latestSnapshotId - numRetainedMax + 1, earliest);
+
+        // the max exclusive snapshot to expire until
+        // protected by 'snapshot.num-retained.min'
+        // (the minimum number of completed snapshots to retain)
+        long maxExclusive = latestSnapshotId - numRetainedMin + 1;
+
+        // the snapshot being read by the consumer cannot be deleted
+        maxExclusive =
+                Math.min(maxExclusive, 
consumerManager.minNextSnapshot().orElse(Long.MAX_VALUE));
+
+        // protected by 'snapshot.expire.limit'
+        // (the maximum number of snapshots allowed to expire at a time)
+        maxExclusive = Math.min(maxExclusive, earliest + expireLimit);
+
+        for (long id = min; id < maxExclusive; id++) {
+            // Early exit the loop for 'snapshot.time-retained'
+            // (the maximum time of snapshots to retain)
             if (snapshotManager.snapshotExists(id)
                     && currentMillis - 
snapshotManager.snapshot(id).timeMillis()
                             <= millisRetained) {
-                // within time threshold, can assume that all snapshots after 
it are also within
-                // the threshold
                 expireUntil(earliest, id);
                 return;
             }
         }
 
-        // by default, expire until there are only numRetainedMin snapshots 
left
-        expireUntil(earliest, latestSnapshotId - numRetainedMin + 1);
+        expireUntil(earliest, maxExclusive);
     }
 
     @VisibleForTesting
     public void expireUntil(long earliestId, long endExclusiveId) {
-        OptionalLong minNextSnapshot = consumerManager.minNextSnapshot();
-        if (minNextSnapshot.isPresent()) {
-            endExclusiveId = Math.min(minNextSnapshot.getAsLong(), 
endExclusiveId);
-        }
-
         if (endExclusiveId <= earliestId) {
             // No expire happens:
             // write the hint file in order to see the earliest snapshot 
directly next time
@@ -162,8 +167,6 @@ public class FileStoreExpireImpl implements FileStoreExpire 
{
             }
         }
 
-        endExclusiveId = Math.min(beginInclusiveId + expireLimit, 
endExclusiveId);
-
         if (LOG.isDebugEnabled()) {
             LOG.debug(
                     "Snapshot expire range is [" + beginInclusiveId + ", " + 
endExclusiveId + ")");
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireTestBase.java
index ad1ef4c33..d9b3469f9 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireTestBase.java
@@ -44,7 +44,7 @@ import java.util.concurrent.ThreadLocalRandom;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Base test class for {@link FileStoreExpireImpl}. */
-public class FileStoreExpireTestBase {
+public abstract class FileStoreExpireTestBase {
 
     protected final FileIO fileIO = new LocalFileIO();
     protected TestKeyValueGenerator gen;

Reply via email to