This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new d36d92b795 IGNITE-23409 Merge implementations of KeyValueStorage
(#4546)
d36d92b795 is described below
commit d36d92b795bc429edfc67f7d6969c3f5651eba83
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Tue Oct 15 09:46:11 2024 +0300
IGNITE-23409 Merge implementations of KeyValueStorage (#4546)
---
.../impl/ItMetaStorageMaintenanceTest.java | 3 +-
.../server/AbstractKeyValueStorage.java | 368 ++++++++++++++
.../server/persistence/RocksDbKeyValueStorage.java | 373 ++-------------
.../AbstractCompactionKeyValueStorageTest.java | 106 +++++
.../server/SimpleInMemoryKeyValueStorage.java | 527 +++++++--------------
.../internal/test/WatchListenerInhibitor.java | 9 +-
6 files changed, 679 insertions(+), 707 deletions(-)
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMaintenanceTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMaintenanceTest.java
index 6705f543e1..b4e45c0ec1 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMaintenanceTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMaintenanceTest.java
@@ -35,6 +35,7 @@ import
org.apache.ignite.internal.configuration.testframework.ConfigurationExten
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.metastorage.server.AbstractKeyValueStorage;
import
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
import org.apache.ignite.internal.metastorage.server.WatchProcessor;
import org.apache.ignite.internal.metastorage.server.time.ClusterTime;
@@ -185,7 +186,7 @@ class ItMetaStorageMaintenanceTest extends
ItMetaStorageMultipleNodesAbstractTes
// TODO: IGNITE-15723 After a component factory is implemented, need
to get rid of reflection here.
var storage = (SimpleInMemoryKeyValueStorage)
getFieldValue(node.metaStorageManager, MetaStorageManagerImpl.class, "storage");
- var watchProcessor = (WatchProcessor) getFieldValue(storage,
SimpleInMemoryKeyValueStorage.class, "watchProcessor");
+ var watchProcessor = (WatchProcessor) getFieldValue(storage,
AbstractKeyValueStorage.class, "watchProcessor");
CompletableFuture<Void> notificationFuture =
getFieldValue(watchProcessor, WatchProcessor.class, "notificationFuture");
if (notificationFuture != null) {
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
new file mode 100644
index 0000000000..e694a9249a
--- /dev/null
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorage.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.server;
+
+import static
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.NOT_FOUND;
+import static
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.assertCompactionRevisionLessThanCurrent;
+import static
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.isLastIndex;
+import static
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.maxRevisionIndex;
+import static
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.minRevisionIndex;
+import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
+import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.LongConsumer;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.failure.FailureManager;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.RevisionUpdateListener;
+import org.apache.ignite.internal.metastorage.WatchListener;
+import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
+import org.apache.ignite.internal.metastorage.impl.EntryImpl;
+import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
+import org.apache.ignite.internal.util.Cursor;
+import org.jetbrains.annotations.Nullable;
+
+/** Abstract implementation of {@link KeyValueStorage}. */
+public abstract class AbstractKeyValueStorage implements KeyValueStorage {
+ protected static final Comparator<byte[]> KEY_COMPARATOR =
Arrays::compareUnsigned;
+
+ protected final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+
+ protected final WatchProcessor watchProcessor;
+
+ /**
+ * Revision listener for recovery only. Notifies {@link
MetaStorageManagerImpl} of revision update.
+ *
+ * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
+ */
+ private @Nullable LongConsumer recoveryRevisionListener;
+
+ /**
+ * Revision. Will be incremented for each single-entry or multi-entry
update operation.
+ *
+ * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
+ */
+ protected long rev;
+
+ /**
+ * Last compaction revision that was set or restored from a snapshot.
+ *
+ * <p>This field is used by metastorage read methods to determine whether
{@link CompactedException} should be thrown.</p>
+ *
+ * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
+ */
+ protected long compactionRevision = -1;
+
+ protected final AtomicBoolean stopCompaction = new AtomicBoolean();
+
+ /**
+ * Constructor.
+ *
+ * @param nodeName Node name.
+ * @param failureManager Failure processor that is used to handle critical
errors.
+ */
+ protected AbstractKeyValueStorage(String nodeName, FailureManager
failureManager) {
+ this.watchProcessor = new WatchProcessor(nodeName, this::get,
failureManager);
+ }
+
+ /** Returns the key revisions for operation, an empty array if not found.
*/
+ protected abstract long[] keyRevisionsForOperation(byte[] key);
+
+ /** Returns key values by revision for operation. */
+ protected abstract Value valueForOperation(byte[] key, long revision);
+
+ @Override
+ public Entry get(byte[] key) {
+ rwLock.readLock().lock();
+
+ try {
+ return doGet(key, rev);
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public Entry get(byte[] key, long revUpperBound) {
+ rwLock.readLock().lock();
+
+ try {
+ return doGet(key, revUpperBound);
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public List<Entry> get(byte[] key, long revLowerBound, long revUpperBound)
{
+ rwLock.readLock().lock();
+
+ try {
+ return doGet(key, revLowerBound, revUpperBound);
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public List<Entry> getAll(List<byte[]> keys) {
+ rwLock.readLock().lock();
+
+ try {
+ return doGetAll(keys, rev);
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public List<Entry> getAll(List<byte[]> keys, long revUpperBound) {
+ rwLock.readLock().lock();
+
+ try {
+ return doGetAll(keys, revUpperBound);
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public Cursor<Entry> range(byte[] keyFrom, byte @Nullable [] keyTo) {
+ rwLock.readLock().lock();
+
+ try {
+ return range(keyFrom, keyTo, rev);
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public long revision() {
+ rwLock.readLock().lock();
+
+ try {
+ return rev;
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public void setCompactionRevision(long revision) {
+ assert revision >= 0 : revision;
+
+ rwLock.writeLock().lock();
+
+ try {
+ assertCompactionRevisionLessThanCurrent(revision, rev);
+
+ compactionRevision = revision;
+ } finally {
+ rwLock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public long getCompactionRevision() {
+ rwLock.readLock().lock();
+
+ try {
+ return compactionRevision;
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public void stopCompaction() {
+ stopCompaction.set(true);
+ }
+
+ @Override
+ public byte @Nullable [] nextKey(byte[] key) {
+ return incrementPrefix(key);
+ }
+
+ @Override
+ public void registerRevisionUpdateListener(RevisionUpdateListener
listener) {
+ watchProcessor.registerRevisionUpdateListener(listener);
+ }
+
+ @Override
+ public void unregisterRevisionUpdateListener(RevisionUpdateListener
listener) {
+ watchProcessor.unregisterRevisionUpdateListener(listener);
+ }
+
+ @Override
+ public CompletableFuture<Void> notifyRevisionUpdateListenerOnStart(long
newRevision) {
+ return watchProcessor.notifyUpdateRevisionListeners(newRevision);
+ }
+
+ @Override
+ public void setRecoveryRevisionListener(@Nullable LongConsumer listener) {
+ rwLock.writeLock().lock();
+
+ try {
+ this.recoveryRevisionListener = listener;
+ } finally {
+ rwLock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public void removeWatch(WatchListener listener) {
+ watchProcessor.removeWatch(listener);
+ }
+
+ @Override
+ public void watchRange(byte[] keyFrom, byte @Nullable [] keyTo, long rev,
WatchListener listener) {
+ assert rev > 0 : rev;
+
+ Predicate<byte[]> rangePredicate = keyTo == null
+ ? k -> KEY_COMPARATOR.compare(keyFrom, k) <= 0
+ : k -> KEY_COMPARATOR.compare(keyFrom, k) <= 0 &&
KEY_COMPARATOR.compare(keyTo, k) > 0;
+
+ watchProcessor.addWatch(new Watch(rev, listener, rangePredicate));
+ }
+
+ @Override
+ public void watchExact(Collection<byte[]> keys, long rev, WatchListener
listener) {
+ assert rev > 0 : rev;
+ assert !keys.isEmpty();
+
+ TreeSet<byte[]> keySet = new TreeSet<>(KEY_COMPARATOR);
+
+ keySet.addAll(keys);
+
+ Predicate<byte[]> inPredicate = keySet::contains;
+
+ watchProcessor.addWatch(new Watch(rev, listener, inPredicate));
+ }
+
+ @Override
+ public void watchExact(byte[] key, long rev, WatchListener listener) {
+ assert rev > 0 : rev;
+
+ Predicate<byte[]> exactPredicate = k -> KEY_COMPARATOR.compare(k, key)
== 0;
+
+ watchProcessor.addWatch(new Watch(rev, listener, exactPredicate));
+ }
+
+ /** Notifies of revision update. Must be called under the {@link #rwLock}.
*/
+ protected void notifyRevisionUpdate() {
+ if (recoveryRevisionListener != null) {
+ // Listener must be invoked only on recovery, after recovery
listener must be null.
+ recoveryRevisionListener.accept(rev);
+ }
+ }
+
+ protected Entry doGet(byte[] key, long revUpperBound) {
+ assert revUpperBound >= 0 : revUpperBound;
+
+ long[] keyRevisions = keyRevisionsForOperation(key);
+ int maxRevisionIndex = maxRevisionIndex(keyRevisions, revUpperBound);
+
+ if (maxRevisionIndex == NOT_FOUND) {
+
CompactedException.throwIfRequestedRevisionLessThanOrEqualToCompacted(revUpperBound,
compactionRevision);
+
+ return EntryImpl.empty(key);
+ }
+
+ long revision = keyRevisions[maxRevisionIndex];
+
+ Value value = valueForOperation(key, revision);
+
+ if (revUpperBound <= compactionRevision && (!isLastIndex(keyRevisions,
maxRevisionIndex) || value.tombstone())) {
+ throw new CompactedException(revUpperBound, compactionRevision);
+ }
+
+ return EntryImpl.toEntry(key, revision, value);
+ }
+
+ private List<Entry> doGet(byte[] key, long revLowerBound, long
revUpperBound) {
+ assert revLowerBound >= 0 : revLowerBound;
+ assert revUpperBound >= 0 : revUpperBound;
+ assert revUpperBound >= revLowerBound : "revLowerBound=" +
revLowerBound + ", revUpperBound=" + revUpperBound;
+
+ long[] keyRevisions = keyRevisionsForOperation(key);
+
+ int minRevisionIndex = minRevisionIndex(keyRevisions, revLowerBound);
+ int maxRevisionIndex = maxRevisionIndex(keyRevisions, revUpperBound);
+
+ if (minRevisionIndex == NOT_FOUND || maxRevisionIndex == NOT_FOUND) {
+
CompactedException.throwIfRequestedRevisionLessThanOrEqualToCompacted(revLowerBound,
compactionRevision);
+
+ return List.of();
+ }
+
+ var entries = new ArrayList<Entry>();
+
+ for (int i = minRevisionIndex; i <= maxRevisionIndex; i++) {
+ long revision = keyRevisions[i];
+
+ Value value;
+
+ // More complex check to read less from disk.
+ // Optimization for persistent storage.
+ if (revision <= compactionRevision) {
+ if (!isLastIndex(keyRevisions, i)) {
+ continue;
+ }
+
+ value = valueForOperation(key, revision);
+
+ if (value.tombstone()) {
+ continue;
+ }
+ } else {
+ value = valueForOperation(key, revision);
+ }
+
+ entries.add(EntryImpl.toEntry(key, revision, value));
+ }
+
+ if (entries.isEmpty()) {
+
CompactedException.throwIfRequestedRevisionLessThanOrEqualToCompacted(revLowerBound,
compactionRevision);
+ }
+
+ return entries;
+ }
+
+ private List<Entry> doGetAll(List<byte[]> keys, long revUpperBound) {
+ assert !keys.isEmpty();
+ assert revUpperBound >= 0 : revUpperBound;
+
+ var res = new ArrayList<Entry>(keys.size());
+
+ for (byte[] key : keys) {
+ res.add(doGet(key, revUpperBound));
+ }
+
+ return res;
+ }
+}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index bf743489de..b1c60c9f50 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -23,9 +23,7 @@ import static
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils
import static
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.assertCompactionRevisionLessThanCurrent;
import static
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.assertRequestedRevisionLessThanOrEqualToCurrent;
import static
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.indexToCompact;
-import static
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.isLastIndex;
import static
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.maxRevisionIndex;
-import static
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.minRevisionIndex;
import static
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.toUtf8String;
import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE;
import static
org.apache.ignite.internal.metastorage.server.persistence.RocksStorageUtils.appendLong;
@@ -45,7 +43,6 @@ import static
org.apache.ignite.internal.metastorage.server.persistence.StorageC
import static
org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.REVISION_TO_TS;
import static
org.apache.ignite.internal.metastorage.server.persistence.StorageColumnFamilyType.TS_TO_REVISION;
import static
org.apache.ignite.internal.metastorage.server.raft.MetaStorageWriteHandler.IDEMPOTENT_COMMAND_PREFIX;
-import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
import static
org.apache.ignite.internal.rocksdb.snapshot.ColumnFamilyRange.fullRange;
import static org.apache.ignite.internal.util.ArrayUtils.LONG_EMPTY_ARRAY;
import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
@@ -62,23 +59,15 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
-import java.util.Comparator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
-import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.LongConsumer;
-import java.util.function.Predicate;
import org.apache.ignite.internal.failure.FailureManager;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ByteArray;
@@ -86,8 +75,6 @@ import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metastorage.CommandId;
import org.apache.ignite.internal.metastorage.Entry;
-import org.apache.ignite.internal.metastorage.RevisionUpdateListener;
-import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.metastorage.dsl.OperationType;
import org.apache.ignite.internal.metastorage.dsl.Operations;
@@ -96,7 +83,7 @@ import org.apache.ignite.internal.metastorage.dsl.Update;
import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
import org.apache.ignite.internal.metastorage.exceptions.MetaStorageException;
import org.apache.ignite.internal.metastorage.impl.EntryImpl;
-import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
+import org.apache.ignite.internal.metastorage.server.AbstractKeyValueStorage;
import org.apache.ignite.internal.metastorage.server.Condition;
import org.apache.ignite.internal.metastorage.server.If;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
@@ -104,8 +91,6 @@ import
org.apache.ignite.internal.metastorage.server.MetastorageChecksum;
import org.apache.ignite.internal.metastorage.server.OnRevisionAppliedCallback;
import org.apache.ignite.internal.metastorage.server.Statement;
import org.apache.ignite.internal.metastorage.server.Value;
-import org.apache.ignite.internal.metastorage.server.Watch;
-import org.apache.ignite.internal.metastorage.server.WatchProcessor;
import org.apache.ignite.internal.rocksdb.ColumnFamily;
import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
import org.apache.ignite.internal.rocksdb.RocksUtils;
@@ -145,7 +130,7 @@ import org.rocksdb.WriteOptions;
* The mapping from the key to the set of the storage's revisions is stored in
the "index" column family. A key represents the key of an
* entry and the value is a {@code byte[]} that represents a {@code long[]}
where every item is a revision of the storage.
*/
-public class RocksDbKeyValueStorage implements KeyValueStorage {
+public class RocksDbKeyValueStorage extends AbstractKeyValueStorage {
private static final IgniteLogger LOG =
Loggers.forClass(RocksDbKeyValueStorage.class);
/** A revision to store with system entries. */
@@ -163,9 +148,6 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
"SYSTEM_COMPACTION_REVISION_KEY".getBytes(UTF_8)
);
- /** Lexicographic order comparator. */
- private static final Comparator<byte[]> CMP = Arrays::compareUnsigned;
-
/** Batch size (number of keys) for storage compaction. The value is
arbitrary. */
private static final int COMPACT_BATCH_SIZE = 10;
@@ -173,9 +155,6 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
RocksDB.loadLibrary();
}
- /** RW lock. */
- private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
-
/** Thread-pool for snapshot operations execution. */
private final ExecutorService snapshotExecutor;
@@ -206,19 +185,6 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
/** Snapshot manager. */
private volatile RocksSnapshotManager snapshotManager;
- /**
- * Revision listener for recovery only. Notifies {@link
MetaStorageManagerImpl} of revision update.
- * Guarded by {@link #rwLock}.
- */
- private @Nullable LongConsumer recoveryRevisionListener;
-
- /**
- * Revision. Will be incremented for each single-entry or multi-entry
update operation.
- *
- * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
- */
- private long rev;
-
/**
* Facility to work with checksums.
*
@@ -226,18 +192,6 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
*/
private MetastorageChecksum checksum;
- /**
- * Last compaction revision that was set or restored from a snapshot.
- *
- * <p>This field is used by metastorage read methods to determine whether
{@link CompactedException} should be thrown.</p>
- *
- * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
- */
- private long compactionRevision = -1;
-
- /** Watch processor. */
- private final WatchProcessor watchProcessor;
-
/** Status of the watch recovery process. */
private enum RecoveryStatus {
INITIAL,
@@ -255,7 +209,7 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
* Buffer used to cache new events while an event replay is in progress.
After replay finishes, the cache gets drained and is never
* used again.
*
- * <p>Multi-threaded access is guarded by {@link #rwLock}.
+ * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
*/
@Nullable
private List<UpdatedEntries> eventCache;
@@ -263,7 +217,9 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
/**
* Current list of updated entries.
*
- * <p>Since this list gets read and updated only on writes (under a write
lock), no extra synchronisation is needed.
+ * <p>Since this list gets read and updated only on writes (under a write
lock), no extra synchronisation is needed.</p>
+ *
+ * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
*/
private final UpdatedEntries updatedEntries = new UpdatedEntries();
@@ -273,18 +229,17 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
/** Metastorage recovery is based on the snapshot & external log. WAL is
never used for recovery, and can be safely disabled. */
private final WriteOptions defaultWriteOptions = new
WriteOptions().setDisableWAL(true);
- private final AtomicBoolean stopCompaction = new AtomicBoolean();
-
/**
* Constructor.
*
* @param nodeName Node name.
* @param dbPath RocksDB path.
+ * @param failureManager Failure processor that is used to handle critical
errors.
*/
public RocksDbKeyValueStorage(String nodeName, Path dbPath, FailureManager
failureManager) {
- this.dbPath = dbPath;
+ super(nodeName, failureManager);
- this.watchProcessor = new WatchProcessor(nodeName, this::get,
failureManager);
+ this.dbPath = dbPath;
this.snapshotExecutor = Executors.newFixedThreadPool(2,
NamedThreadFactory.create(nodeName, "metastorage-snapshot-executor", LOG));
}
@@ -412,17 +367,6 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
return bytesToLong(bytes);
}
- /**
- * Notifies of revision update.
- * Must be called under the {@link #rwLock}.
- */
- private void notifyRevisionUpdate() {
- if (recoveryRevisionListener != null) {
- // Listener must be invoked only on recovery, after recovery
listener must be null.
- recoveryRevisionListener.accept(rev);
- }
- }
-
/**
* Clear the RocksDB instance.
*
@@ -459,7 +403,13 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
@Override
public CompletableFuture<Void> snapshot(Path snapshotPath) {
- return snapshotManager.createSnapshot(snapshotPath);
+ rwLock.writeLock().lock();
+
+ try {
+ return snapshotManager.createSnapshot(snapshotPath);
+ } finally {
+ rwLock.writeLock().unlock();
+ }
}
@Override
@@ -492,17 +442,6 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
}
}
- @Override
- public long revision() {
- rwLock.readLock().lock();
-
- try {
- return rev;
- } finally {
- rwLock.readLock().unlock();
- }
- }
-
@Override
public void put(byte[] key, byte[] value, HybridTimestamp opTs) {
rwLock.writeLock().lock();
@@ -599,12 +538,6 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
return longToBytes(ts.longValue());
}
- private static Entry entry(byte[] key, long revision, Value value) {
- return value.tombstone()
- ? EntryImpl.tombstone(key, revision,
value.operationTimestamp())
- : new EntryImpl(key, value.bytes(), revision,
value.operationTimestamp());
- }
-
@Override
public void putAll(List<byte[]> keys, List<byte[]> values, HybridTimestamp
opTs) {
rwLock.writeLock().lock();
@@ -628,61 +561,6 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
}
}
- @Override
- public Entry get(byte[] key) {
- rwLock.readLock().lock();
-
- try {
- return doGet(key, rev);
- } finally {
- rwLock.readLock().unlock();
- }
- }
-
- @Override
- public Entry get(byte[] key, long revUpperBound) {
- rwLock.readLock().lock();
-
- try {
- return doGet(key, revUpperBound);
- } finally {
- rwLock.readLock().unlock();
- }
- }
-
- @Override
- public List<Entry> get(byte[] key, long revLowerBound, long revUpperBound)
{
- rwLock.readLock().lock();
-
- try {
- return doGet(key, revLowerBound, revUpperBound);
- } finally {
- rwLock.readLock().unlock();
- }
- }
-
- @Override
- public List<Entry> getAll(List<byte[]> keys) {
- rwLock.readLock().lock();
-
- try {
- return doGetAll(keys, rev);
- } finally {
- rwLock.readLock().unlock();
- }
- }
-
- @Override
- public List<Entry> getAll(List<byte[]> keys, long revUpperBound) {
- rwLock.readLock().lock();
-
- try {
- return doGetAll(keys, revUpperBound);
- } finally {
- rwLock.readLock().unlock();
- }
- }
-
@Override
public void remove(byte[] key, HybridTimestamp opTs) {
rwLock.writeLock().lock();
@@ -884,45 +762,9 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
}
}
- @Override
- public void watchRange(byte[] keyFrom, byte @Nullable [] keyTo, long rev,
WatchListener listener) {
- assert keyFrom != null : "keyFrom couldn't be null.";
- assert rev > 0 : "rev must be positive.";
-
- Predicate<byte[]> rangePredicate = keyTo == null
- ? k -> CMP.compare(keyFrom, k) <= 0
- : k -> CMP.compare(keyFrom, k) <= 0 && CMP.compare(keyTo, k) >
0;
-
- watchProcessor.addWatch(new Watch(rev, listener, rangePredicate));
- }
-
- @Override
- public void watchExact(byte[] key, long rev, WatchListener listener) {
- assert key != null : "key couldn't be null.";
- assert rev > 0 : "rev must be positive.";
-
- Predicate<byte[]> exactPredicate = k -> CMP.compare(k, key) == 0;
-
- watchProcessor.addWatch(new Watch(rev, listener, exactPredicate));
- }
-
- @Override
- public void watchExact(Collection<byte[]> keys, long rev, WatchListener
listener) {
- assert keys != null && !keys.isEmpty() : "keys couldn't be null or
empty: " + keys;
- assert rev > 0 : "rev must be positive.";
-
- TreeSet<byte[]> keySet = new TreeSet<>(CMP);
-
- keySet.addAll(keys);
-
- Predicate<byte[]> inPredicate = keySet::contains;
-
- watchProcessor.addWatch(new Watch(rev, listener, inPredicate));
- }
-
@Override
public void startWatches(long startRevision, OnRevisionAppliedCallback
revisionCallback) {
- assert startRevision != 0 : "First meaningful revision is 1";
+ assert startRevision > 0 : startRevision;
long currentRevision;
@@ -950,11 +792,6 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
}
}
- @Override
- public void removeWatch(WatchListener listener) {
- watchProcessor.removeWatch(listener);
- }
-
@Override
public void compact(long revision) {
assert revision >= 0 : revision;
@@ -968,16 +805,6 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
}
}
- @Override
- public void stopCompaction() {
- stopCompaction.set(true);
- }
-
- @Override
- public byte @Nullable [] nextKey(byte[] key) {
- return incrementPrefix(key);
- }
-
/**
* Adds a key to a batch marking the value as a tombstone.
*
@@ -1044,90 +871,6 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
}
}
- private List<Entry> doGetAll(Collection<byte[]> keys, long revUpperBound) {
- assert !keys.isEmpty();
- assert revUpperBound >= 0 : revUpperBound;
-
- var res = new ArrayList<Entry>(keys.size());
-
- for (byte[] key : keys) {
- res.add(doGet(key, revUpperBound));
- }
-
- return res;
- }
-
- private Entry doGet(byte[] key, long revUpperBound) {
- assert revUpperBound >= 0 : revUpperBound;
-
- long[] keyRevisions = getRevisionsForOperation(key);
- int maxRevisionIndex = maxRevisionIndex(keyRevisions, revUpperBound);
-
- if (maxRevisionIndex == NOT_FOUND) {
-
CompactedException.throwIfRequestedRevisionLessThanOrEqualToCompacted(revUpperBound,
compactionRevision);
-
- return EntryImpl.empty(key);
- }
-
- long revision = keyRevisions[maxRevisionIndex];
-
- Value value = getValueForOperation(key, revision);
-
- if (revUpperBound <= compactionRevision && (!isLastIndex(keyRevisions,
maxRevisionIndex) || value.tombstone())) {
- throw new CompactedException(revUpperBound, compactionRevision);
- }
-
- return EntryImpl.toEntry(key, revision, value);
- }
-
- private List<Entry> doGet(byte[] key, long revLowerBound, long
revUpperBound) {
- assert revLowerBound >= 0 : revLowerBound;
- assert revUpperBound >= 0 : revUpperBound;
- assert revUpperBound >= revLowerBound : "revLowerBound=" +
revLowerBound + ", revUpperBound=" + revUpperBound;
-
- long[] keyRevisions = getRevisionsForOperation(key);
-
- int minRevisionIndex = minRevisionIndex(keyRevisions, revLowerBound);
- int maxRevisionIndex = maxRevisionIndex(keyRevisions, revUpperBound);
-
- if (minRevisionIndex == NOT_FOUND || maxRevisionIndex == NOT_FOUND) {
-
CompactedException.throwIfRequestedRevisionLessThanOrEqualToCompacted(revLowerBound,
compactionRevision);
-
- return List.of();
- }
-
- var entries = new ArrayList<Entry>();
-
- for (int i = minRevisionIndex; i <= maxRevisionIndex; i++) {
- long revision = keyRevisions[i];
-
- Value value;
-
- // More complex check to read less from disk.
- if (revision <= compactionRevision) {
- if (!isLastIndex(keyRevisions, i)) {
- continue;
- }
-
- value = getValueForOperation(key, revision);
-
- if (value.tombstone()) {
- continue;
- }
- } else {
- value = getValueForOperation(key, revision);
- }
-
- entries.add(EntryImpl.toEntry(key, revision, value));
- }
-
- if (entries.isEmpty()) {
-
CompactedException.throwIfRequestedRevisionLessThanOrEqualToCompacted(revLowerBound,
compactionRevision);
- }
-
- return entries;
- }
-
/**
* Returns array of revisions of the entry corresponding to the key.
*
@@ -1144,20 +887,6 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
return getAsLongs(revisions);
}
- /**
- * Returns array of revisions of the entry corresponding to the key.
- *
- * @param key Key.
- * @throws MetaStorageException If there was an error while getting the
revisions for the key.
- */
- private long[] getRevisionsForOperation(byte[] key) {
- try {
- return getRevisions(key);
- } catch (RocksDBException e) {
- throw new MetaStorageException(OP_EXECUTION_ERR, "Failed to get
revisions for the key: " + toUtf8String(key), e);
- }
- }
-
/**
* Adds an entry to the batch.
*
@@ -1181,7 +910,7 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
data.put(batch, rocksKey, rocksValue);
- updatedEntries.add(entry(key, curRev, new Value(value, opTs)));
+ updatedEntries.add(EntryImpl.toEntry(key, curRev, new Value(value,
opTs)));
}
/**
@@ -1297,7 +1026,7 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
ts = hybridTimestamp(timestampFromRocksValue(rocksValue));
}
- updatedEntries.add(entry(rocksKeyToBytes(rocksKey), revision,
bytesToValue(rocksValue)));
+
updatedEntries.add(EntryImpl.toEntry(rocksKeyToBytes(rocksKey), revision,
bytesToValue(rocksValue)));
}
try {
@@ -1384,17 +1113,6 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
}
}
- @Override
- public void setRecoveryRevisionListener(@Nullable LongConsumer listener) {
- rwLock.writeLock().lock();
-
- try {
- this.recoveryRevisionListener = listener;
- } finally {
- rwLock.writeLock().unlock();
- }
- }
-
@TestOnly
public Path getDbPath() {
return dbPath;
@@ -1440,21 +1158,6 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
}
}
- @Override
- public void registerRevisionUpdateListener(RevisionUpdateListener
listener) {
- watchProcessor.registerRevisionUpdateListener(listener);
- }
-
- @Override
- public void unregisterRevisionUpdateListener(RevisionUpdateListener
listener) {
- watchProcessor.unregisterRevisionUpdateListener(listener);
- }
-
- @Override
- public CompletableFuture<Void> notifyRevisionUpdateListenerOnStart(long
newRevision) {
- return watchProcessor.notifyUpdateRevisionListeners(newRevision);
- }
-
@Override
public void advanceSafeTime(HybridTimestamp newSafeTime) {
rwLock.writeLock().lock();
@@ -1487,32 +1190,6 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
}
}
- @Override
- public void setCompactionRevision(long revision) {
- assert revision >= 0 : revision;
-
- rwLock.writeLock().lock();
-
- try {
- assertCompactionRevisionLessThanCurrent(revision, rev);
-
- compactionRevision = revision;
- } finally {
- rwLock.writeLock().unlock();
- }
- }
-
- @Override
- public long getCompactionRevision() {
- rwLock.readLock().lock();
-
- try {
- return compactionRevision;
- } finally {
- rwLock.readLock().unlock();
- }
- }
-
@Override
public long checksum(long revision) {
rwLock.readLock().lock();
@@ -1621,7 +1298,17 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
}
}
- private Value getValueForOperation(byte[] key, long revision) {
+ @Override
+ protected long[] keyRevisionsForOperation(byte[] key) {
+ try {
+ return getRevisions(key);
+ } catch (RocksDBException e) {
+ throw new MetaStorageException(OP_EXECUTION_ERR, "Failed to get
revisions for the key: " + toUtf8String(key), e);
+ }
+ }
+
+ @Override
+ protected Value valueForOperation(byte[] key, long revision) {
Value value = getValueForOperationNullable(key, revision);
assert value != null : "key=" + toUtf8String(key) + ", revision=" +
revision;
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
index 956382ca5b..805e0bab04 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
@@ -108,6 +108,10 @@ public abstract class
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
assertEquals(List.of(4, 6/* Tombstone */), collectRevisions(SOME_KEY));
}
+ /**
+ * Tests {@link KeyValueStorage#compact(long)} for a specific single
revision, to simplify testing, see examples in the method
+ * description. Keys with their revisions are added in {@link #setUp()}.
+ */
@Test
void testCompactRevision1() {
storage.compact(1);
@@ -117,6 +121,10 @@ public abstract class
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
assertEquals(List.of(4, 6), collectRevisions(SOME_KEY));
}
+ /**
+ * Tests {@link KeyValueStorage#compact(long)} for a specific single
revision, to simplify testing, see examples in the method
+ * description. Keys with their revisions are added in {@link #setUp()}.
+ */
@Test
void testCompactRevision2() {
storage.compact(2);
@@ -126,6 +134,10 @@ public abstract class
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
assertEquals(List.of(4, 6), collectRevisions(SOME_KEY));
}
+ /**
+ * Tests {@link KeyValueStorage#compact(long)} for a specific single
revision, to simplify testing, see examples in the method
+ * description. Keys with their revisions are added in {@link #setUp()}.
+ */
@Test
void testCompactRevision3() {
storage.compact(3);
@@ -135,6 +147,10 @@ public abstract class
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
assertEquals(List.of(4, 6), collectRevisions(SOME_KEY));
}
+ /**
+ * Tests {@link KeyValueStorage#compact(long)} for a specific single
revision, to simplify testing, see examples in the method
+ * description. Keys with their revisions are added in {@link #setUp()}.
+ */
@Test
void testCompactRevision4() {
storage.compact(4);
@@ -144,6 +160,10 @@ public abstract class
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
assertEquals(List.of(6), collectRevisions(SOME_KEY));
}
+ /**
+ * Tests {@link KeyValueStorage#compact(long)} for a specific single
revision, to simplify testing, see examples in the method
+ * description. Keys with their revisions are added in {@link #setUp()}.
+ */
@Test
void testCompactRevision5() {
storage.compact(5);
@@ -153,6 +173,10 @@ public abstract class
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
assertEquals(List.of(6), collectRevisions(SOME_KEY));
}
+ /**
+ * Tests {@link KeyValueStorage#compact(long)} for a specific single
revision, to simplify testing, see examples in the method
+ * description. Keys with their revisions are added in {@link #setUp()}.
+ */
@Test
void testCompactRevision6() {
storage.compact(6);
@@ -162,6 +186,10 @@ public abstract class
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
assertEquals(List.of(), collectRevisions(SOME_KEY));
}
+ /**
+ * Tests {@link KeyValueStorage#compact(long)} as if it were called for
each revision sequentially, see examples in the method
+ * description. Keys with their revisions are added in {@link #setUp()}.
+ */
@Test
void testCompactRevisionSequentially() {
testCompactRevision1();
@@ -172,6 +200,10 @@ public abstract class
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
testCompactRevision6();
}
+ /**
+ * Tests that after the storage is recovered, compacted keys will not be
returned. Keys with their revisions are added in
+ * {@link #setUp()}.
+ */
@Test
void testRevisionsAfterRestart() {
storage.compact(6);
@@ -187,6 +219,10 @@ public abstract class
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
assertEquals(List.of(), collectRevisions(SOME_KEY));
}
+ /**
+ * Tests stopping the compaction. Since it is impossible to predict what
the result will be if you stop somewhere in the middle of the
+ * compaction, it is easiest to stop before the compaction starts. Keys
with their revisions are added in {@link #setUp()}.
+ */
@Test
void testCompactBeforeStopIt() {
storage.stopCompaction();
@@ -281,6 +317,10 @@ public abstract class
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
assertEquals(-1, storage.getCompactionRevision());
}
+ /**
+ * Tests {@link Entry#timestamp()} for a key that will be fully removed
from storage after compaction. This case would be suitable for
+ * the {@link #BAR_KEY}, since its last revision is a tombstone. Keys with
their revisions are added in {@link #setUp()}.
+ */
@Test
void testEntryOperationTimestampAfterCompaction() {
storage.compact(6);
@@ -340,6 +380,10 @@ public abstract class
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
assertThrows(CompactedException.class, () ->
storage.revisionByTimestamp(timestamp3.subtractPhysicalTime(1)));
}
+ /**
+ * Tests that {@link KeyValueStorage#get(byte[])} will not throw the
{@link CompactedException} for all keys after compacting to the
+ * penultimate repository revision. Keys with their revisions are added in
{@link #setUp()}.
+ */
@Test
void testGetSingleEntryLatestAndCompaction() {
storage.setCompactionRevision(6);
@@ -349,6 +393,11 @@ public abstract class
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
assertDoesNotThrow(() -> storage.get(NOT_EXISTS_KEY));
}
+ /**
+ * Tests {@link KeyValueStorage#get(byte[], long)} using examples from the
description only for the {@link #FOO_KEY} for which the last
+ * revision is <b>not</b> tombstone. Only one key is considered so that
the tests are not too long. Keys with their revisions are
+ * added in {@link #setUp()}.
+ */
@Test
void testGetSingleEntryAndCompactionForFooKey() {
// FOO_KEY has revisions: [1, 3, 5].
@@ -377,6 +426,11 @@ public abstract class
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
assertDoesNotThrowCompactedExceptionForGetSingleValue(FOO_KEY, 5);
}
+ /**
+ * Tests {@link KeyValueStorage#get(byte[], long)} using examples from the
description only for the {@link #BAR_KEY} for which the last
+ * revision is tombstone. Only one key is considered so that the tests are
not too long. Keys with their revisions are added in
+ * {@link #setUp()}.
+ */
@Test
void testGetSingleEntryAndCompactionForBarKey() {
// BAR_KEY has revisions: [1, 2, 5 (tombstone)].
@@ -405,6 +459,11 @@ public abstract class
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
assertDoesNotThrowCompactedExceptionForGetSingleValue(BAR_KEY, 7);
}
+ /**
+ * Tests {@link KeyValueStorage#get(byte[], long)} using examples from the
description only for the {@link #NOT_EXISTS_KEY} for which
+ * was never present in the storage. Only one key is considered so that
the tests are not too long. Keys with their revisions are added
+ * in {@link #setUp()}.
+ */
@Test
void testGetSingleEntryAndCompactionForNotExistsKey() {
storage.setCompactionRevision(1);
@@ -432,6 +491,10 @@ public abstract class
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
assertDoesNotThrowCompactedExceptionForGetSingleValue(NOT_EXISTS_KEY,
7);
}
+ /**
+ * Tests that {@link KeyValueStorage#getAll(List)} will not throw the
{@link CompactedException} for all keys after compacting to the
+ * penultimate repository revision. Keys with their revisions are added in
{@link #setUp()}.
+ */
@Test
void testGetAllLatestAndCompaction() {
storage.setCompactionRevision(6);
@@ -439,6 +502,11 @@ public abstract class
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
assertDoesNotThrow(() -> storage.getAll(List.of(FOO_KEY, BAR_KEY,
NOT_EXISTS_KEY)));
}
+ /**
+ * Tests {@link KeyValueStorage#getAll(List, long)} using examples from
the description only for the {@link #FOO_KEY} for which the
+ * last revision is <b>not</b> tombstone. Only one key is considered so
that the tests are not too long. Keys with their revisions are
+ * added in {@link #setUp()}.
+ */
@Test
void testGetAllAndCompactionForFooKey() {
// FOO_KEY has revisions: [1, 3, 5].
@@ -467,6 +535,11 @@ public abstract class
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
assertDoesNotThrowCompactedExceptionForGetAll(5, FOO_KEY);
}
+ /**
+ * Tests {@link KeyValueStorage#getAll(List, long)} using examples from
the description only for the {@link #BAR_KEY} for which the
+ * last revision is tombstone. Only one key is considered so that the
tests are not too long. Keys with their revisions are added in
+ * {@link #setUp()}.
+ */
@Test
void testGetAllAndCompactionForBarKey() {
// BAR_KEY has revisions: [1, 2, 5 (tombstone)].
@@ -495,6 +568,11 @@ public abstract class
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
assertDoesNotThrowCompactedExceptionForGetAll(7, BAR_KEY);
}
+ /**
+ * Tests {@link KeyValueStorage#getAll(List, long)} using examples from
the description only for the {@link #NOT_EXISTS_KEY} for which
+ * was never present in the storage. Only one key is considered so that
the tests are not too long. Keys with their revisions are added
+ * in {@link #setUp()}.
+ */
@Test
void testGetAllAndCompactionForNotExistsKey() {
storage.setCompactionRevision(1);
@@ -522,6 +600,11 @@ public abstract class
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
assertDoesNotThrowCompactedExceptionForGetAll(7, NOT_EXISTS_KEY);
}
+ /**
+ * Tests {@link KeyValueStorage#getAll(List, long)} using examples from
the description for several keys at once; it is enough to
+ * consider only two cases of compaction. Only one key is considered so
that the tests are not too long. Keys with their revisions are
+ * added in {@link #setUp()}.
+ */
@Test
void testGetAllAndCompactionForMultipleKeys() {
storage.setCompactionRevision(1);
@@ -536,6 +619,11 @@ public abstract class
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
assertDoesNotThrowCompactedExceptionForGetAll(6, FOO_KEY, BAR_KEY,
NOT_EXISTS_KEY);
}
+ /**
+ * Tests {@link KeyValueStorage#get(byte[], long, long)} using examples
from the description only for the {@link #FOO_KEY} for which
+ * the last revision is <b>not</b> tombstone. Only one key is considered
so that the tests are not too long. Keys with their revisions
+ * are added in {@link #setUp()}.
+ */
@Test
void testGetListAndCompactionForFooKey() {
// FOO_KEY has revisions: [1, 3, 5].
@@ -593,6 +681,11 @@ public abstract class
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
assertThrowsCompactedExceptionForGetList(FOO_KEY, 6, 7);
}
+ /**
+ * Tests {@link KeyValueStorage#get(byte[], long, long)} using examples
from the description only for the {@link #BAR_KEY} for which
+ * the last revision is tombstone. Only one key is considered so that the
tests are not too long. Keys with their revisions are added
+ * in {@link #setUp()}.
+ */
@Test
void testGetListAndCompactionForBarKey() {
// BAR_KEY has revisions: [1, 2, 5 (tombstone)].
@@ -652,6 +745,11 @@ public abstract class
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
assertDoesNotThrowsCompactedExceptionForGetList(BAR_KEY, 7, 7);
}
+ /**
+ * Tests {@link KeyValueStorage#get(byte[], long, long)} using examples
from the description only for the {@link #NOT_EXISTS_KEY} for
+ * which was never present in the storage. Only one key is considered so
that the tests are not too long. Keys with their revisions are
+ * added in {@link #setUp()}.
+ */
@Test
void testGetListAndCompactionForNotExistsKey() {
storage.setCompactionRevision(1);
@@ -690,6 +788,10 @@ public abstract class
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
assertDoesNotThrowsCompactedExceptionForGetList(NOT_EXISTS_KEY, 7, 7);
}
+ /**
+ * Tests that {@link KeyValueStorage#range(byte[], byte[])} and cursor
methods will not throw {@link CompactedException} after
+ * compacting to the penultimate revision. The key is chosen randomly.
Keys with their revisions are added in {@link #setUp()}.
+ */
@Test
void testRangeLatestAndCompaction() {
storage.setCompactionRevision(6);
@@ -702,6 +804,10 @@ public abstract class
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
});
}
+ /**
+ * Tests {@link KeyValueStorage#range(byte[], byte[], long)} and cursor
methods as described in the method. The key is chosen randomly.
+ * Keys with their revisions are added in {@link #setUp()}.
+ */
@Test
void testRangeAndCompaction() {
try (Cursor<Entry> cursorBeforeSetCompactionRevision =
storage.range(FOO_KEY, null, 5)) {
diff --git
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index fad0672cb5..251ae61005 100644
---
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -26,13 +26,10 @@ import static
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils
import static
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.assertCompactionRevisionLessThanCurrent;
import static
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.assertRequestedRevisionLessThanOrEqualToCurrent;
import static
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.indexToCompact;
-import static
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.isLastIndex;
import static
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.maxRevisionIndex;
-import static
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.minRevisionIndex;
import static
org.apache.ignite.internal.metastorage.server.KeyValueStorageUtils.toUtf8String;
import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE;
import static
org.apache.ignite.internal.metastorage.server.raft.MetaStorageWriteHandler.IDEMPOTENT_COMMAND_PREFIX;
-import static org.apache.ignite.internal.rocksdb.RocksUtils.incrementPrefix;
import static org.apache.ignite.internal.util.ArrayUtils.LONG_EMPTY_ARRAY;
import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -45,7 +42,6 @@ import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -53,26 +49,19 @@ import java.util.Map;
import java.util.NavigableMap;
import java.util.SortedMap;
import java.util.TreeMap;
-import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.LongConsumer;
-import java.util.function.Predicate;
import org.apache.ignite.internal.failure.NoOpFailureManager;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.metastorage.CommandId;
import org.apache.ignite.internal.metastorage.Entry;
-import org.apache.ignite.internal.metastorage.RevisionUpdateListener;
-import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.metastorage.dsl.Operations;
import org.apache.ignite.internal.metastorage.dsl.StatementResult;
import org.apache.ignite.internal.metastorage.exceptions.CompactedException;
import org.apache.ignite.internal.metastorage.exceptions.MetaStorageException;
import org.apache.ignite.internal.metastorage.impl.EntryImpl;
-import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.IgniteUtils;
@@ -81,26 +70,27 @@ import org.jetbrains.annotations.Nullable;
/**
* Simple in-memory key/value storage for tests.
*/
-public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
- /** Lexicographical comparator. */
- private static final Comparator<byte[]> CMP = Arrays::compareUnsigned;
-
+public class SimpleInMemoryKeyValueStorage extends AbstractKeyValueStorage {
/**
* Keys index. Value is the list of all revisions under which entry
corresponding to the key was modified.
*
* <p>Concurrent map to avoid {@link
java.util.ConcurrentModificationException} on compaction.</p>
*
- * <p>Guarded by {@link #mux}.</p>
+ * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
*/
- private final NavigableMap<byte[], List<Long>> keysIdx = new
ConcurrentSkipListMap<>(CMP);
+ private final NavigableMap<byte[], List<Long>> keysIdx = new
ConcurrentSkipListMap<>(KEY_COMPARATOR);
- /** Timestamp to revision mapping. */
+ /**
+ * Timestamp to revision mapping.
+ *
+ * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
+ */
private final NavigableMap<Long, Long> tsToRevMap = new TreeMap<>();
/**
* Revision to timestamp mapping.
*
- * <p>Guarded by {@link #mux}.</p>
+ * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
*/
private final Map<Long, HybridTimestamp> revToTsMap = new HashMap<>();
@@ -109,52 +99,28 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
*
* <p>Concurrent map to avoid {@link
java.util.ConcurrentModificationException} on compaction.</p>
*
- * <p>Guarded by {@link #mux}.</p>
+ * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
*/
private final NavigableMap<Long, NavigableMap<byte[], Value>> revsIdx =
new ConcurrentSkipListMap<>();
- /**
- * Revision. Will be incremented for each single-entry or multi-entry
update operation.
- *
- * <p>Multi-threaded access is guarded by {@link #mux}.</p>
- */
- private long rev;
-
- /**
- * Last compaction revision that was set or restored from a snapshot.
- *
- * <p>This field is used by metastorage read methods to determine whether
{@link CompactedException} should be thrown.</p>
- *
- * <p>Multi-threaded access is guarded by {@link #mux}.</p>
- */
- private long compactionRevision = -1;
-
/**
* Last {@link #saveCompactionRevision saved} compaction revision.
*
* <p>Used only when working with snapshots.</p>
+ *
+ * <p>Multi-threaded access is guarded by {@link #rwLock}.</p>
*/
private long savedCompactionRevision = -1;
- /** All operations are queued on this lock. */
- private final Object mux = new Object();
-
+ /** Multi-threaded access is guarded by {@link #rwLock}. */
private boolean areWatchesEnabled = false;
- private final WatchProcessor watchProcessor;
-
+ /** Multi-threaded access is guarded by {@link #rwLock}. */
private final List<Entry> updatedEntries = new ArrayList<>();
- /**
- * Revision listener for recovery only. Notifies {@link
MetaStorageManagerImpl} of revision update.
- * Guarded by {@link #mux}.
- */
- private @Nullable LongConsumer recoveryRevisionListener;
-
- private final AtomicBoolean stopCompaction = new AtomicBoolean();
-
+ /** Constructor. */
public SimpleInMemoryKeyValueStorage(String nodeName) {
- this.watchProcessor = new WatchProcessor(nodeName, this::get, new
NoOpFailureManager());
+ super(nodeName, new NoOpFailureManager());
}
@Override
@@ -162,21 +128,18 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
// no-op
}
- @Override
- public long revision() {
- synchronized (mux) {
- return rev;
- }
- }
-
@Override
public void put(byte[] key, byte[] value, HybridTimestamp opTs) {
- synchronized (mux) {
+ rwLock.writeLock().lock();
+
+ try {
long curRev = rev + 1;
doPut(key, value, curRev, opTs);
updateRevision(curRev, opTs);
+ } finally {
+ rwLock.writeLock().unlock();
}
}
@@ -191,75 +154,38 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
notifyRevisionUpdate();
}
- /**
- * Notifies of revision update.
- * Must be called under the {@link #mux} lock.
- */
- private void notifyRevisionUpdate() {
- if (recoveryRevisionListener != null) {
- // Listener must be invoked only on recovery, after recovery
listener must be null.
- recoveryRevisionListener.accept(rev);
- }
- }
-
@Override
public void putAll(List<byte[]> keys, List<byte[]> values, HybridTimestamp
opTs) {
- synchronized (mux) {
+ rwLock.writeLock().lock();
+
+ try {
long curRev = rev + 1;
doPutAll(curRev, keys, values, opTs);
- }
- }
-
- @Override
- public Entry get(byte[] key) {
- synchronized (mux) {
- return doGet(key, rev);
- }
- }
-
- @Override
- public Entry get(byte[] key, long revUpperBound) {
- synchronized (mux) {
- return doGet(key, revUpperBound);
- }
- }
-
-
- @Override
- public List<Entry> get(byte[] key, long revLowerBound, long revUpperBound)
{
- synchronized (mux) {
- return doGet(key, revLowerBound, revUpperBound);
- }
- }
-
- @Override
- public List<Entry> getAll(List<byte[]> keys) {
- synchronized (mux) {
- return doGetAll(keys, rev);
- }
- }
-
- @Override
- public List<Entry> getAll(List<byte[]> keys, long revUpperBound) {
- synchronized (mux) {
- return doGetAll(keys, revUpperBound);
+ } finally {
+ rwLock.writeLock().unlock();
}
}
@Override
public void remove(byte[] key, HybridTimestamp opTs) {
- synchronized (mux) {
+ rwLock.writeLock().lock();
+
+ try {
long curRev = rev + 1;
doRemove(key, curRev, opTs);
updateRevision(curRev, opTs);
+ } finally {
+ rwLock.writeLock().unlock();
}
}
@Override
public void removeAll(List<byte[]> keys, HybridTimestamp opTs) {
- synchronized (mux) {
+ rwLock.writeLock().lock();
+
+ try {
long curRev = rev + 1;
List<byte[]> existingKeys = new ArrayList<>(keys.size());
@@ -279,6 +205,8 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
}
doPutAll(curRev, existingKeys, vals, opTs);
+ } finally {
+ rwLock.writeLock().unlock();
}
}
@@ -290,7 +218,9 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
HybridTimestamp opTs,
CommandId commandId
) {
- synchronized (mux) {
+ rwLock.writeLock().lock();
+
+ try {
Collection<Entry> e = getAll(Arrays.asList(condition.keys()));
boolean branch = condition.test(e.toArray(new Entry[]{}));
@@ -329,12 +259,16 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
updateRevision(curRev, opTs);
return branch;
+ } finally {
+ rwLock.writeLock().unlock();
}
}
@Override
public StatementResult invoke(If iif, HybridTimestamp opTs, CommandId
commandId) {
- synchronized (mux) {
+ rwLock.writeLock().lock();
+
+ try {
If currIf = iif;
while (true) {
Collection<Entry> e =
getAll(Arrays.asList(currIf.cond().keys()));
@@ -380,33 +314,40 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
currIf = branch.iif();
}
}
+ } finally {
+ rwLock.writeLock().unlock();
}
}
@Override
public Cursor<Entry> range(byte[] keyFrom, byte @Nullable [] keyTo) {
- synchronized (mux) {
+ rwLock.readLock().lock();
+
+ try {
return doRange(keyFrom, keyTo, rev);
+ } finally {
+ rwLock.readLock().unlock();
}
}
@Override
public Cursor<Entry> range(byte[] keyFrom, byte @Nullable [] keyTo, long
revUpperBound) {
- synchronized (mux) {
+ rwLock.readLock().lock();
+
+ try {
return doRange(keyFrom, keyTo, revUpperBound);
+ } finally {
+ rwLock.readLock().unlock();
}
}
- @Override
- public byte @Nullable [] nextKey(byte[] key) {
- return incrementPrefix(key);
- }
-
@Override
public HybridTimestamp timestampByRevision(long revision) {
assert revision >= 0 : revision;
- synchronized (mux) {
+ rwLock.readLock().lock();
+
+ try {
assertRequestedRevisionLessThanOrEqualToCurrent(revision, rev);
HybridTimestamp timestamp = revToTsMap.get(revision);
@@ -416,12 +357,16 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
}
return timestamp;
+ } finally {
+ rwLock.readLock().unlock();
}
}
@Override
public long revisionByTimestamp(HybridTimestamp timestamp) {
- synchronized (mux) {
+ rwLock.readLock().lock();
+
+ try {
Map.Entry<Long, Long> revisionEntry =
tsToRevMap.floorEntry(timestamp.longValue());
if (revisionEntry == null) {
@@ -429,62 +374,25 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
}
return revisionEntry.getValue();
+ } finally {
+ rwLock.readLock().unlock();
}
}
- @Override
- public void setRecoveryRevisionListener(@Nullable LongConsumer listener) {
- synchronized (mux) {
- this.recoveryRevisionListener = listener;
- }
- }
-
- @Override
- public void watchRange(byte[] keyFrom, byte @Nullable [] keyTo, long rev,
WatchListener listener) {
- assert keyFrom != null : "keyFrom couldn't be null.";
- assert rev > 0 : "rev must be positive.";
-
- Predicate<byte[]> rangePredicate = keyTo == null
- ? k -> CMP.compare(keyFrom, k) <= 0
- : k -> CMP.compare(keyFrom, k) <= 0 && CMP.compare(keyTo, k) >
0;
-
- watchProcessor.addWatch(new Watch(rev, listener, rangePredicate));
- }
-
- @Override
- public void watchExact(byte[] key, long rev, WatchListener listener) {
- assert key != null : "key couldn't be null.";
- assert rev > 0 : "rev must be positive.";
-
- Predicate<byte[]> exactPredicate = k -> CMP.compare(k, key) == 0;
-
- watchProcessor.addWatch(new Watch(rev, listener, exactPredicate));
- }
-
- @Override
- public void watchExact(Collection<byte[]> keys, long rev, WatchListener
listener) {
- assert keys != null && !keys.isEmpty() : "keys couldn't be null or
empty: " + keys;
- assert rev > 0 : "rev must be positive.";
-
- TreeSet<byte[]> keySet = new TreeSet<>(CMP);
-
- keySet.addAll(keys);
-
- Predicate<byte[]> inPredicate = keySet::contains;
-
- watchProcessor.addWatch(new Watch(rev, listener, inPredicate));
- }
-
@Override
public void startWatches(long startRevision, OnRevisionAppliedCallback
revisionCallback) {
- assert startRevision != 0 : "First meaningful revision is 1";
+ assert startRevision > 0 : startRevision;
- synchronized (mux) {
+ rwLock.readLock().lock();
+
+ try {
areWatchesEnabled = true;
watchProcessor.setRevisionCallback(revisionCallback);
replayUpdates(startRevision);
+ } finally {
+ rwLock.readLock().unlock();
}
}
@@ -524,17 +432,14 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
updatedEntries.clear();
}
- @Override
- public void removeWatch(WatchListener listener) {
- watchProcessor.removeWatch(listener);
- }
-
@Override
public void compact(long revision) {
assert revision >= 0 : revision;
for (Map.Entry<byte[], List<Long>> entry : keysIdx.entrySet()) {
- synchronized (mux) {
+ rwLock.writeLock().lock();
+
+ try {
assertCompactionRevisionLessThanCurrent(revision, rev);
if (stopCompaction.get()) {
@@ -542,10 +447,14 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
}
compactForKey(entry.getKey(), toLongArray(entry.getValue()),
revision);
+ } finally {
+ rwLock.writeLock().unlock();
}
}
- synchronized (mux) {
+ rwLock.writeLock().lock();
+
+ try {
for (Iterator<Map.Entry<Long, HybridTimestamp>> it =
revToTsMap.entrySet().iterator(); it.hasNext(); ) {
Map.Entry<Long, HybridTimestamp> e = it.next();
@@ -557,14 +466,11 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
break;
}
}
+ } finally {
+ rwLock.writeLock().unlock();
}
}
- @Override
- public void stopCompaction() {
- stopCompaction.set(true);
- }
-
@Override
public void close() {
stopCompaction();
@@ -574,78 +480,82 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
@Override
public CompletableFuture<Void> snapshot(Path snapshotPath) {
- synchronized (mux) {
- try {
- Files.createDirectories(snapshotPath);
+ rwLock.writeLock().lock();
- Path snapshotFile =
snapshotPath.resolve(SimpleInMemoryKeyValueStorageSnapshot.FILE_NAME);
+ try {
+ Files.createDirectories(snapshotPath);
- assertTrue(IgniteUtils.deleteIfExists(snapshotFile),
snapshotFile.toString());
+ Path snapshotFile =
snapshotPath.resolve(SimpleInMemoryKeyValueStorageSnapshot.FILE_NAME);
- Files.createFile(snapshotFile);
+ assertTrue(IgniteUtils.deleteIfExists(snapshotFile),
snapshotFile.toString());
- Map<Long, Map<byte[], ValueSnapshot>> revsIdxCopy =
revsIdx.entrySet().stream()
- .collect(toMap(
- Map.Entry::getKey,
- revIdxEntry -> revIdxEntry.getValue()
- .entrySet()
- .stream()
- .collect(toMap(Map.Entry::getKey, e ->
new ValueSnapshot(e.getValue())))
- ));
+ Files.createFile(snapshotFile);
- var snapshot = new SimpleInMemoryKeyValueStorageSnapshot(
- Map.copyOf(keysIdx),
- Map.copyOf(tsToRevMap),
- Map.copyOf(revToTsMap),
- revsIdxCopy,
- rev,
- savedCompactionRevision
- );
+ Map<Long, Map<byte[], ValueSnapshot>> revsIdxCopy =
revsIdx.entrySet().stream()
+ .collect(toMap(
+ Map.Entry::getKey,
+ revIdxEntry -> revIdxEntry.getValue()
+ .entrySet()
+ .stream()
+ .collect(toMap(Map.Entry::getKey, e -> new
ValueSnapshot(e.getValue())))
+ ));
- byte[] snapshotBytes = ByteUtils.toBytes(snapshot);
+ var snapshot = new SimpleInMemoryKeyValueStorageSnapshot(
+ Map.copyOf(keysIdx),
+ Map.copyOf(tsToRevMap),
+ Map.copyOf(revToTsMap),
+ revsIdxCopy,
+ rev,
+ savedCompactionRevision
+ );
- Files.write(snapshotFile, snapshotBytes, WRITE);
+ byte[] snapshotBytes = ByteUtils.toBytes(snapshot);
- return nullCompletedFuture();
- } catch (Throwable t) {
- return failedFuture(t);
- }
+ Files.write(snapshotFile, snapshotBytes, WRITE);
+
+ return nullCompletedFuture();
+ } catch (Throwable t) {
+ return failedFuture(t);
+ } finally {
+ rwLock.writeLock().unlock();
}
}
@Override
public void restoreSnapshot(Path snapshotPath) {
- synchronized (mux) {
- try {
- keysIdx.clear();
- tsToRevMap.clear();
- revToTsMap.clear();
- revsIdx.clear();
+ rwLock.writeLock().lock();
- Path snapshotFile =
snapshotPath.resolve(SimpleInMemoryKeyValueStorageSnapshot.FILE_NAME);
+ try {
+ keysIdx.clear();
+ tsToRevMap.clear();
+ revToTsMap.clear();
+ revsIdx.clear();
- assertTrue(Files.exists(snapshotPath),
snapshotFile.toString());
+ Path snapshotFile =
snapshotPath.resolve(SimpleInMemoryKeyValueStorageSnapshot.FILE_NAME);
- byte[] snapshotBytes = Files.readAllBytes(snapshotFile);
+ assertTrue(Files.exists(snapshotPath), snapshotFile.toString());
- var snapshot = (SimpleInMemoryKeyValueStorageSnapshot)
ByteUtils.fromBytes(snapshotBytes);
+ byte[] snapshotBytes = Files.readAllBytes(snapshotFile);
- keysIdx.putAll(snapshot.keysIdx);
- tsToRevMap.putAll(snapshot.tsToRevMap);
- revToTsMap.putAll(snapshot.revToTsMap);
- snapshot.revsIdx.forEach((revision, entries) -> {
- TreeMap<byte[], Value> entries0 = new TreeMap<>(CMP);
- entries.forEach((keyBytes, valueSnapshot) ->
entries0.put(keyBytes, valueSnapshot.toValue()));
+ var snapshot = (SimpleInMemoryKeyValueStorageSnapshot)
ByteUtils.fromBytes(snapshotBytes);
- revsIdx.put(revision, entries0);
- });
+ keysIdx.putAll(snapshot.keysIdx);
+ tsToRevMap.putAll(snapshot.tsToRevMap);
+ revToTsMap.putAll(snapshot.revToTsMap);
+ snapshot.revsIdx.forEach((revision, entries) -> {
+ TreeMap<byte[], Value> entries0 = new
TreeMap<>(KEY_COMPARATOR);
+ entries.forEach((keyBytes, valueSnapshot) ->
entries0.put(keyBytes, valueSnapshot.toValue()));
- rev = snapshot.rev;
- compactionRevision = snapshot.savedCompactionRevision;
- savedCompactionRevision = snapshot.savedCompactionRevision;
- } catch (Throwable t) {
- throw new MetaStorageException(RESTORING_STORAGE_ERR, t);
- }
+ revsIdx.put(revision, entries0);
+ });
+
+ rev = snapshot.rev;
+ compactionRevision = snapshot.savedCompactionRevision;
+ savedCompactionRevision = snapshot.savedCompactionRevision;
+ } catch (Throwable t) {
+ throw new MetaStorageException(RESTORING_STORAGE_ERR, t);
+ } finally {
+ rwLock.writeLock().unlock();
}
}
@@ -699,79 +609,6 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
}
}
- private List<Entry> doGetAll(List<byte[]> keys, long revUpperBound) {
- assert !keys.isEmpty();
- assert revUpperBound >= 0 : revUpperBound;
-
- var res = new ArrayList<Entry>(keys.size());
-
- for (byte[] key : keys) {
- res.add(doGet(key, revUpperBound));
- }
-
- return res;
- }
-
- private Entry doGet(byte[] key, long revUpperBound) {
- assert revUpperBound >= 0 : revUpperBound;
-
- long[] keyRevisions = toLongArray(keysIdx.get(key));
- int maxRevisionIndex = maxRevisionIndex(keyRevisions, revUpperBound);
-
- if (maxRevisionIndex == NOT_FOUND) {
-
CompactedException.throwIfRequestedRevisionLessThanOrEqualToCompacted(revUpperBound,
compactionRevision);
-
- return EntryImpl.empty(key);
- }
-
- long revision = keyRevisions[maxRevisionIndex];
-
- Value value = getValue(key, revision);
-
- if (revUpperBound <= compactionRevision && (!isLastIndex(keyRevisions,
maxRevisionIndex) || value.tombstone())) {
- throw new CompactedException(revUpperBound, compactionRevision);
- }
-
- return EntryImpl.toEntry(key, revision, value);
- }
-
- private List<Entry> doGet(byte[] key, long revLowerBound, long
revUpperBound) {
- assert revLowerBound >= 0 : revLowerBound;
- assert revUpperBound >= 0 : revUpperBound;
- assert revUpperBound >= revLowerBound : "revLowerBound=" +
revLowerBound + ", revUpperBound=" + revUpperBound;
-
- long[] keyRevisions = toLongArray(keysIdx.get(key));
-
- int minRevisionIndex = minRevisionIndex(keyRevisions, revLowerBound);
- int maxRevisionIndex = maxRevisionIndex(keyRevisions, revUpperBound);
-
- if (minRevisionIndex == NOT_FOUND || maxRevisionIndex == NOT_FOUND) {
-
CompactedException.throwIfRequestedRevisionLessThanOrEqualToCompacted(revLowerBound,
compactionRevision);
-
- return List.of();
- }
-
- var entries = new ArrayList<Entry>();
-
- for (int i = minRevisionIndex; i <= maxRevisionIndex; i++) {
- long revision = keyRevisions[i];
-
- Value value = getValue(key, revision);
-
- if (revision <= compactionRevision && (!isLastIndex(keyRevisions,
i) || value.tombstone())) {
- continue;
- }
-
- entries.add(EntryImpl.toEntry(key, revision, value));
- }
-
- if (entries.isEmpty()) {
-
CompactedException.throwIfRequestedRevisionLessThanOrEqualToCompacted(revLowerBound,
compactionRevision);
- }
-
- return entries;
- }
-
private void doPut(byte[] key, byte[] bytes, long curRev, HybridTimestamp
opTs) {
// Update keysIdx.
List<Long> revs = keysIdx.computeIfAbsent(key, k -> new ArrayList<>());
@@ -785,7 +622,7 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
curRev,
(rev, entries) -> {
if (entries == null) {
- entries = new TreeMap<>(CMP);
+ entries = new TreeMap<>(KEY_COMPARATOR);
}
entries.put(key, val);
@@ -801,61 +638,43 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
}
private void doPutAll(long curRev, List<byte[]> keys, List<byte[]>
bytesList, HybridTimestamp opTs) {
- synchronized (mux) {
- // Update revsIdx.
- NavigableMap<byte[], Value> entries = new TreeMap<>(CMP);
-
- for (int i = 0; i < keys.size(); i++) {
- byte[] key = keys.get(i);
-
- byte[] bytes = bytesList.get(i);
+ // Update revsIdx.
+ NavigableMap<byte[], Value> entries = new TreeMap<>(KEY_COMPARATOR);
- // Update keysIdx.
- List<Long> revs = keysIdx.computeIfAbsent(key, k -> new
ArrayList<>());
+ for (int i = 0; i < keys.size(); i++) {
+ byte[] key = keys.get(i);
- revs.add(curRev);
+ byte[] bytes = bytesList.get(i);
- Value val = new Value(bytes, opTs);
+ // Update keysIdx.
+ List<Long> revs = keysIdx.computeIfAbsent(key, k -> new
ArrayList<>());
- entries.put(key, val);
+ revs.add(curRev);
- updatedEntries.add(new EntryImpl(key, bytes, curRev, opTs));
+ Value val = new Value(bytes, opTs);
- revsIdx.put(curRev, entries);
- }
+ entries.put(key, val);
- updateRevision(curRev, opTs);
+ updatedEntries.add(new EntryImpl(key, bytes, curRev, opTs));
+ revsIdx.put(curRev, entries);
}
- }
- private static long lastRevision(List<Long> revs) {
- return revs.get(revs.size() - 1);
- }
-
- @Override
- public void registerRevisionUpdateListener(RevisionUpdateListener
listener) {
- watchProcessor.registerRevisionUpdateListener(listener);
- }
-
- @Override
- public void unregisterRevisionUpdateListener(RevisionUpdateListener
listener) {
- watchProcessor.unregisterRevisionUpdateListener(listener);
- }
-
- @Override
- public CompletableFuture<Void> notifyRevisionUpdateListenerOnStart(long
newRevision) {
- return watchProcessor.notifyUpdateRevisionListeners(newRevision);
+ updateRevision(curRev, opTs);
}
@Override
public void advanceSafeTime(HybridTimestamp newSafeTime) {
- synchronized (mux) {
+ rwLock.writeLock().lock();
+
+ try {
if (!areWatchesEnabled) {
return;
}
watchProcessor.advanceSafeTime(newSafeTime);
+ } finally {
+ rwLock.writeLock().unlock();
}
}
@@ -863,28 +682,14 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
public void saveCompactionRevision(long revision) {
assert revision >= 0 : revision;
- synchronized (mux) {
- assertCompactionRevisionLessThanCurrent(revision, rev);
+ rwLock.writeLock().lock();
- savedCompactionRevision = revision;
- }
- }
-
- @Override
- public void setCompactionRevision(long revision) {
- assert revision >= 0 : revision;
-
- synchronized (mux) {
+ try {
assertCompactionRevisionLessThanCurrent(revision, rev);
- compactionRevision = revision;
- }
- }
-
- @Override
- public long getCompactionRevision() {
- synchronized (mux) {
- return compactionRevision;
+ savedCompactionRevision = revision;
+ } finally {
+ rwLock.writeLock().unlock();
}
}
@@ -917,7 +722,13 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
return value.tombstone();
}
- private Value getValue(byte[] key, long revision) {
+ @Override
+ protected long[] keyRevisionsForOperation(byte[] key) {
+ return toLongArray(keysIdx.get(key));
+ }
+
+ @Override
+ protected Value valueForOperation(byte[] key, long revision) {
Value value = getValueNullable(key, revision);
assert value != null : "key=" + toUtf8String(key) + ", revision=" +
revision;
diff --git
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java
index f7131c30ed..9c964b7665 100644
---
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java
+++
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java
@@ -29,6 +29,7 @@ import org.apache.ignite.Ignite;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
+import org.apache.ignite.internal.metastorage.server.AbstractKeyValueStorage;
import org.apache.ignite.internal.metastorage.server.WatchProcessor;
import
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
@@ -66,12 +67,10 @@ public class WatchListenerInhibitor {
* @return Listener inhibitor.
*/
public static WatchListenerInhibitor
metastorageEventsInhibitor(MetaStorageManager metaStorageManager) {
- var metaStorageManager0 = metaStorageManager;
-
// TODO: IGNITE-15723 After a component factory is implemented, need
to got rid of reflection here.
- var storage = (RocksDbKeyValueStorage)
getFieldValue(metaStorageManager0, MetaStorageManagerImpl.class, "storage");
+ var storage = (RocksDbKeyValueStorage)
getFieldValue(metaStorageManager, MetaStorageManagerImpl.class, "storage");
- var watchProcessor = (WatchProcessor) getFieldValue(storage,
RocksDbKeyValueStorage.class, "watchProcessor");
+ var watchProcessor = (WatchProcessor) getFieldValue(storage,
AbstractKeyValueStorage.class, "watchProcessor");
return new WatchListenerInhibitor(watchProcessor, storage);
}
@@ -81,7 +80,7 @@ public class WatchListenerInhibitor {
this.storage = storage;
processorNotificationFutureField = getField(watchProcessor,
WatchProcessor.class, "notificationFuture");
- storageRwLockField = getField(storage, RocksDbKeyValueStorage.class,
"rwLock");
+ storageRwLockField = getField(storage, AbstractKeyValueStorage.class,
"rwLock");
}
/**