This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 55c038f87 [core] Use binary search to optimize
SnapshotManager#earlierThanTimeMills (#2805)
55c038f87 is described below
commit 55c038f87f3d2d5c123fade53826079154a97a11
Author: tsreaper <[email protected]>
AuthorDate: Thu Feb 22 13:30:52 2024 +0800
[core] Use binary search to optimize SnapshotManager#earlierThanTimeMills
(#2805)
---
.../org/apache/paimon/utils/SnapshotManager.java | 29 ++++---
.../apache/paimon/utils/SnapshotManagerTest.java | 89 +++++++++++++++++-----
2 files changed, 89 insertions(+), 29 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index e614af7c8..b330fc303 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -181,23 +181,30 @@ public class SnapshotManager implements Serializable {
}
/**
- * Returns a snapshot earlier than the timestamp mills. A non-existent
snapshot may be returned
- * if all snapshots are later than the timestamp mills.
+ * Returns the latest snapshot earlier than the timestamp mills. A
non-existent snapshot may be
+ * returned if all snapshots are equal to or later than the timestamp
mills.
*/
public @Nullable Long earlierThanTimeMills(long timestampMills) {
Long earliest = earliestSnapshotId();
Long latest = latestSnapshotId();
+
if (earliest == null || latest == null) {
return null;
}
- for (long i = latest; i >= earliest; i--) {
- long commitTime = snapshot(i).timeMillis();
- if (commitTime < timestampMills) {
- return i;
+ if (snapshot(earliest).timeMillis() >= timestampMills) {
+ return earliest - 1;
+ }
+
+ while (earliest < latest) {
+ long mid = (earliest + latest + 1) / 2;
+ if (snapshot(mid).timeMillis() < timestampMills) {
+ earliest = mid;
+ } else {
+ latest = mid - 1;
}
}
- return earliest - 1;
+ return earliest;
}
/**
@@ -214,7 +221,7 @@ public class SnapshotManager implements Serializable {
if (snapshot(earliest).timeMillis() > timestampMills) {
return null;
}
- Snapshot finnalSnapshot = null;
+ Snapshot finalSnapshot = null;
while (earliest <= latest) {
long mid = earliest + (latest - earliest) / 2; // Avoid overflow
Snapshot snapshot = snapshot(mid);
@@ -223,13 +230,13 @@ public class SnapshotManager implements Serializable {
latest = mid - 1; // Search in the left half
} else if (commitTime < timestampMills) {
earliest = mid + 1; // Search in the right half
- finnalSnapshot = snapshot;
+ finalSnapshot = snapshot;
} else {
- finnalSnapshot = snapshot; // Found the exact match
+ finalSnapshot = snapshot; // Found the exact match
break;
}
}
- return finnalSnapshot;
+ return finalSnapshot;
}
public long snapshotCount() throws IOException {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
index 9778ba3be..7294e3810 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/SnapshotManagerTest.java
@@ -28,8 +28,12 @@ import org.junit.jupiter.api.io.TempDir;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
@@ -49,6 +53,52 @@ public class SnapshotManagerTest {
}
}
+ @Test
+ public void testEarlierThanTimeMillis() throws IOException {
+ long base = System.currentTimeMillis();
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+
+ int numSnapshots = random.nextInt(1, 20);
+ Set<Long> set = new HashSet<>();
+ while (set.size() < numSnapshots) {
+ set.add(base + random.nextLong(0, 1_000_000));
+ }
+ List<Long> millis = set.stream().sorted().collect(Collectors.toList());
+
+ FileIO localFileIO = LocalFileIO.create();
+ SnapshotManager snapshotManager =
+ new SnapshotManager(localFileIO, new Path(tempDir.toString()));
+ int firstSnapshotId = random.nextInt(1, 100);
+ for (int i = 0; i < numSnapshots; i++) {
+ Snapshot snapshot = createSnapshotWithMillis(firstSnapshotId + i,
millis.get(i));
+ localFileIO.writeFileUtf8(
+ snapshotManager.snapshotPath(firstSnapshotId + i),
snapshot.toJson());
+ }
+
+ for (int tries = 0; tries < 10; tries++) {
+ long time;
+ if (random.nextBoolean()) {
+ // pick a random time
+ time = base + random.nextLong(0, 1_000_000);
+ } else {
+ // pick a random time equal to one of the snapshots
+ time = millis.get(random.nextInt(numSnapshots));
+ }
+ Long actual = snapshotManager.earlierThanTimeMills(time);
+
+ if (millis.get(numSnapshots - 1) < time) {
+ assertThat(actual).isEqualTo(firstSnapshotId + numSnapshots -
1);
+ } else {
+ for (int i = 0; i < numSnapshots; i++) {
+ if (millis.get(i) >= time) {
+ assertThat(actual).isEqualTo(firstSnapshotId + i - 1);
+ break;
+ }
+ }
+ }
+ }
+ }
+
@Test
public void testEarlierOrEqualTimeMills() throws IOException {
long millis = 1684726826L;
@@ -57,24 +107,7 @@ public class SnapshotManagerTest {
new SnapshotManager(localFileIO, new Path(tempDir.toString()));
// create 10 snapshots
for (long i = 0; i < 10; i++) {
- Snapshot snapshot =
- new Snapshot(
- i,
- 0L,
- null,
- null,
- null,
- null,
- null,
- 0L,
- Snapshot.CommitKind.APPEND,
- millis + i * 1000,
- null,
- null,
- null,
- null,
- null,
- null);
+ Snapshot snapshot = createSnapshotWithMillis(i, millis + i * 1000);
localFileIO.writeFileUtf8(snapshotManager.snapshotPath(i),
snapshot.toJson());
}
// smaller than the second snapshot return the first snapshot
@@ -88,6 +121,26 @@ public class SnapshotManagerTest {
.isEqualTo(millis + 1000);
}
+ private Snapshot createSnapshotWithMillis(long id, long millis) {
+ return new Snapshot(
+ id,
+ 0L,
+ null,
+ null,
+ null,
+ null,
+ null,
+ 0L,
+ Snapshot.CommitKind.APPEND,
+ millis,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null);
+ }
+
@Test
public void testTraversalSnapshotsFromLatestSafely() throws IOException,
InterruptedException {
FileIO localFileIO = LocalFileIO.create();