This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a33dc6804 [core] Optimize snapshot expire loop for consumers (#2546)
a33dc6804 is described below
commit a33dc68045ffe1e8b351d35ba440b5dfe9b3adb4
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Dec 21 19:01:46 2023 +0800
[core] Optimize snapshot expire loop for consumers (#2546)
This closes #2546.
---
.../paimon/operation/FileStoreExpireImpl.java | 39 ++++++++++++----------
.../paimon/operation/FileStoreExpireTestBase.java | 2 +-
2 files changed, 22 insertions(+), 19 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
index ab6bb5d09..c326eb35e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreExpireImpl.java
@@ -31,7 +31,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
-import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.function.Predicate;
@@ -112,33 +111,39 @@ public class FileStoreExpireImpl implements
FileStoreExpire {
return;
}
- // locate the first snapshot between the numRetainedMax th and
(numRetainedMin+1) th latest
- // snapshots to be retained. This snapshot needs to be preserved
because it
- // doesn't fulfill the time threshold condition for expiration.
- for (long id = Math.max(latestSnapshotId - numRetainedMax + 1,
earliest);
- id <= latestSnapshotId - numRetainedMin;
- id++) {
+ // the min snapshot to retain from 'snapshot.num-retained.max'
+ // (the maximum number of snapshots to retain)
+ long min = Math.max(latestSnapshotId - numRetainedMax + 1, earliest);
+
+ // the max exclusive snapshot to expire until
+ // protected by 'snapshot.num-retained.min'
+ // (the minimum number of completed snapshots to retain)
+ long maxExclusive = latestSnapshotId - numRetainedMin + 1;
+
+ // the snapshot being read by the consumer cannot be deleted
+ maxExclusive =
+ Math.min(maxExclusive,
consumerManager.minNextSnapshot().orElse(Long.MAX_VALUE));
+
+ // protected by 'snapshot.expire.limit'
+ // (the maximum number of snapshots allowed to expire at a time)
+ maxExclusive = Math.min(maxExclusive, earliest + expireLimit);
+
+ for (long id = min; id < maxExclusive; id++) {
+ // Early exit the loop for 'snapshot.time-retained'
+ // (the maximum time of snapshots to retain)
if (snapshotManager.snapshotExists(id)
&& currentMillis -
snapshotManager.snapshot(id).timeMillis()
<= millisRetained) {
- // within time threshold, can assume that all snapshots after
it are also within
- // the threshold
expireUntil(earliest, id);
return;
}
}
- // by default, expire until there are only numRetainedMin snapshots
left
- expireUntil(earliest, latestSnapshotId - numRetainedMin + 1);
+ expireUntil(earliest, maxExclusive);
}
@VisibleForTesting
public void expireUntil(long earliestId, long endExclusiveId) {
- OptionalLong minNextSnapshot = consumerManager.minNextSnapshot();
- if (minNextSnapshot.isPresent()) {
- endExclusiveId = Math.min(minNextSnapshot.getAsLong(),
endExclusiveId);
- }
-
if (endExclusiveId <= earliestId) {
// No expire happens:
// write the hint file in order to see the earliest snapshot
directly next time
@@ -162,8 +167,6 @@ public class FileStoreExpireImpl implements FileStoreExpire
{
}
}
- endExclusiveId = Math.min(beginInclusiveId + expireLimit,
endExclusiveId);
-
if (LOG.isDebugEnabled()) {
LOG.debug(
"Snapshot expire range is [" + beginInclusiveId + ", " +
endExclusiveId + ")");
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireTestBase.java
index ad1ef4c33..d9b3469f9 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreExpireTestBase.java
@@ -44,7 +44,7 @@ import java.util.concurrent.ThreadLocalRandom;
import static org.assertj.core.api.Assertions.assertThat;
/** Base test class for {@link FileStoreExpireImpl}. */
-public class FileStoreExpireTestBase {
+public abstract class FileStoreExpireTestBase {
protected final FileIO fileIO = new LocalFileIO();
protected TestKeyValueGenerator gen;