This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 77761fb7c IGNITE-17303 RocksDB snapshots might include writes added
after snapshot creation start (#924)
77761fb7c is described below
commit 77761fb7c1f7b7c669ea8525741bb4d10a72f030
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Tue Jul 5 19:06:13 2022 +0400
IGNITE-17303 RocksDB snapshots might include writes added after snapshot
creation start (#924)
---
.../raft/AbstractClusterStateStorageTest.java | 39 ++++++++++++++++++++++
.../raft/ConcurrentMapClusterStateStorage.java | 16 ++++-----
.../raft/client/service/RaftGroupListener.java | 2 ++
.../rocksdb/snapshot/RocksSnapshotManager.java | 21 +++++++-----
4 files changed, 62 insertions(+), 16 deletions(-)
diff --git
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/AbstractClusterStateStorageTest.java
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/AbstractClusterStateStorageTest.java
index cc4fe1b80..881defe3f 100644
---
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/AbstractClusterStateStorageTest.java
+++
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/AbstractClusterStateStorageTest.java
@@ -33,6 +33,7 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.util.Cursor;
@@ -269,4 +270,42 @@ public abstract class AbstractClusterStateStorageTest {
assertThat(storage.get(key1), is(value1));
assertThat(storage.get(key2), is(value2));
}
+
+ /**
+ * Tests that writes coming after a snapshot is started do not get
reflected in the snapshot.
+ *
+ * @throws Exception If something goes wrong.
+ */
+ @Test
+ void snapshotShouldNotContainWritesAddedAfterItsStart() throws Exception {
+ final int entriesInSnapshot = 100_000;
+
+ for (int i = 0; i < entriesInSnapshot; i++) {
+ putKeyValue(i);
+ }
+
+ Path snapshotDirPath = workDir.resolve("snapshot");
+ Files.createDirectories(snapshotDirPath);
+
+ CompletableFuture<Void> snapshotFuture =
storage.snapshot(snapshotDirPath);
+
+ for (int i = entriesInSnapshot; i < entriesInSnapshot + 1000; i++) {
+ putKeyValue(i);
+ }
+
+ snapshotFuture.join();
+
+ storage.restoreSnapshot(snapshotDirPath);
+
+ byte[] keyAddedAfterSnapshotStart = key(entriesInSnapshot);
+ assertThat(storage.get(keyAddedAfterSnapshotStart), is(nullValue()));
+ }
+
+ private void putKeyValue(int n) {
+ storage.put(key(n), ("value" + n).getBytes(UTF_8));
+ }
+
+ private byte[] key(int n) {
+ return ("key" + n).getBytes(UTF_8);
+ }
}
diff --git
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/ConcurrentMapClusterStateStorage.java
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/ConcurrentMapClusterStateStorage.java
index ef89843ef..5681d7d03 100644
---
a/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/ConcurrentMapClusterStateStorage.java
+++
b/modules/cluster-management/src/test/java/org/apache/ignite/internal/cluster/management/raft/ConcurrentMapClusterStateStorage.java
@@ -111,16 +111,16 @@ public class ConcurrentMapClusterStateStorage implements
ClusterStateStorage {
@Override
public CompletableFuture<Void> snapshot(Path snapshotPath) {
- return CompletableFuture.runAsync(() -> {
- try (var out = new
ObjectOutputStream(Files.newOutputStream(snapshotPath.resolve(SNAPSHOT_FILE))))
{
- var keys = new ArrayList<byte[]>(map.size());
- var values = new ArrayList<byte[]>(map.size());
+ var keys = new ArrayList<byte[]>(map.size());
+ var values = new ArrayList<byte[]>(map.size());
- map.forEach((k, v) -> {
- keys.add(k.bytes());
- values.add(v);
- });
+ map.forEach((k, v) -> {
+ keys.add(k.bytes());
+ values.add(v);
+ });
+ return CompletableFuture.runAsync(() -> {
+ try (var out = new
ObjectOutputStream(Files.newOutputStream(snapshotPath.resolve(SNAPSHOT_FILE))))
{
out.writeObject(keys);
out.writeObject(values);
} catch (Exception e) {
diff --git
a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupListener.java
b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupListener.java
index 8035d392f..c23f842db 100644
---
a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupListener.java
+++
b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/RaftGroupListener.java
@@ -54,6 +54,8 @@ public interface RaftGroupListener {
/**
* The callback to save a snapshot. The execution should be asynchronous
to avoid blocking of STM updates.
+ * But the snapshot coordinates (or copy-of-data-to-include-in-snapshot)
must be taken synchronously before starting the asynchronous
+ * snapshotting process.
*
* @param path Snapshot directory to store data.
* @param doneClo The closure to call on finish. Pass the not null
exception if the snapshot has not been created or null on successful
diff --git
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/snapshot/RocksSnapshotManager.java
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/snapshot/RocksSnapshotManager.java
index 6c8728804..b5a204526 100644
---
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/snapshot/RocksSnapshotManager.java
+++
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/snapshot/RocksSnapshotManager.java
@@ -25,6 +25,7 @@ import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
+import java.util.function.Function;
import org.apache.ignite.internal.rocksdb.RocksUtils;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteInternalException;
@@ -77,8 +78,11 @@ public class RocksSnapshotManager {
public CompletableFuture<Void> createSnapshot(Path snapshotDir) {
Path tmpPath = Paths.get(snapshotDir.toString() + TMP_SUFFIX);
- return CompletableFuture.supplyAsync(db::getSnapshot, executor)
- .thenComposeAsync(snapshot -> {
+ // The snapshot reference must be taken synchronously, otherwise we
might let more writes sneak into the snapshot than needed.
+ Snapshot snapshot = db.getSnapshot();
+
+ return CompletableFuture.supplyAsync(
+ () -> {
createTmpSnapshotDir(tmpPath);
// Create futures for capturing SST snapshots of the
column families
@@ -86,19 +90,20 @@ public class RocksSnapshotManager {
.map(cf -> createSstFileAsync(cf, snapshot,
tmpPath))
.toArray(CompletableFuture[]::new);
- return CompletableFuture.allOf(sstFutures).thenApply(v ->
snapshot);
+ return CompletableFuture.allOf(sstFutures);
}, executor)
- .whenCompleteAsync((snapshot, e) -> {
- if (e != null) {
- return;
- }
-
+ .thenCompose(Function.identity())
+ .whenCompleteAsync((ignored, e) -> {
db.releaseSnapshot(snapshot);
// Snapshot is not actually closed here, because a
Snapshot instance doesn't own a pointer, the
// database does. Calling close to maintain the
AutoCloseable semantics
snapshot.close();
+ if (e != null) {
+ return;
+ }
+
// Delete snapshot directory if it already exists
IgniteUtils.deleteIfExists(snapshotDir);