This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new f1ac1dba18 IGNITE-23288 Throw CompactedException for metastorage
cursors and publishers (#4536)
f1ac1dba18 is described below
commit f1ac1dba183c03e76cd3fd838dbd8e19cd5c1d54
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Fri Oct 11 21:02:18 2024 +0300
IGNITE-23288 Throw CompactedException for metastorage cursors and
publishers (#4536)
---
.../internal/metastorage/MetaStorageManager.java | 219 +++++++++------------
.../metastorage/impl/MetaStorageManagerImpl.java | 21 +-
.../metastorage/server/KeyValueStorage.java | 40 ++--
.../metastorage/server/KeyValueStorageUtils.java | 5 +-
.../server/persistence/RocksDbKeyValueStorage.java | 219 ++++++++-------------
.../AbstractCompactionKeyValueStorageTest.java | 34 ++++
.../server/SimpleInMemoryKeyValueStorage.java | 96 ++++-----
7 files changed, 280 insertions(+), 354 deletions(-)
diff --git
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index 9cf32ec6b4..e4c6f7fd7e 100644
---
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -54,72 +54,29 @@ public interface MetaStorageManager extends IgniteComponent
{
long appliedRevision();
/**
- * Returns a future of getting the latest version of an entry by key from
the metastore leader.
+ * Returns a future of getting the latest version of an entry by key from
the metastorage leader.
*
* <p>Never completes with a {@link CompactedException}.</p>
*
* <p>Future may complete with {@link NodeStoppingException} if the node
is in the process of stopping.</p>
*
- * @param key The key.
+ * @param key Key.
*/
CompletableFuture<Entry> get(ByteArray key);
/**
- * Returns a future of getting an entry for the given key and the revision
upper bound from the metastore leader.
+ * Returns a future of getting an entry for the given key and the revision
upper bound from the metastorage leader.
*
* <p>Future may complete with exceptions:</p>
* <ul>
* <li>{@link NodeStoppingException} - if the node is in the process
of stopping.</li>
- * <li>{@link CompactedException} - If the requested entry was not
found and the {@code revUpperBound} is less than or equal to the
- * last compacted one.</li>
- * </ul>
- *
- * <p>Let's consider examples of the work of the method and compaction of
the metastore. Let's assume that we have keys with revisions
- * "foo" [1, 2] and "bar" [1, 2 (tombstone)], and the key "some" has never
been in the metastore.</p>
- * <ul>
- * <li>Compaction revision is {@code 1}.
- * <ul>
- * <li>get("foo", 1) - a {@link CompactedException} will be
thrown.</li>
- * <li>get("foo", 2) - will return a single value with revision
2.</li>
- * <li>get("foo", 3) - will return a single value with revision
2.</li>
- * <li>get("bar", 1) - a {@link CompactedException} will be
thrown.</li>
- * <li>get("bar", 2) - will return a single value with revision
2.</li>
- * <li>get("bar", 3) - will return a single value with revision
2.</li>
- * <li>get("some", 1) - a {@link CompactedException} will be
thrown.</li>
- * <li>get("some", 2) - will return an empty value.</li>
- * <li>get("some", 3) - will return an empty value.</li>
- * </ul>
- * </li>
- * <li>Compaction revision is {@code 2}.
- * <ul>
- * <li>get("foo", 1) - a {@link CompactedException} will be
thrown.</li>
- * <li>get("foo", 2) - will return a single value with revision
2.</li>
- * <li>get("foo", 3) - will return a single value with revision
2.</li>
- * <li>get("bar", 1) - a {@link CompactedException} will be
thrown.</li>
- * <li>get("bar", 2) - a {@link CompactedException} will be
thrown.</li>
- * <li>get("bar", 3) - will return a single value with revision
2.</li>
- * <li>get("some", 1) - a {@link CompactedException} will be
thrown.</li>
- * <li>get("some", 2) - a {@link CompactedException} will be
thrown.</li>
- * <li>get("some", 3) - will return an empty value.</li>
- * </ul>
- * </li>
- * <li>Compaction revision is {@code 3}.
- * <ul>
- * <li>get("foo", 1) - a {@link CompactedException} will be
thrown.</li>
- * <li>get("foo", 2) - will return a single value with revision
2.</li>
- * <li>get("foo", 3) - will return a single value with revision
2.</li>
- * <li>get("bar", 1) - a {@link CompactedException} will be
thrown.</li>
- * <li>get("bar", 2) - a {@link CompactedException} will be
thrown.</li>
- * <li>get("bar", 3) - a {@link CompactedException} will be
thrown.</li>
- * <li>get("some", 1) - a {@link CompactedException} will be
thrown.</li>
- * <li>get("some", 2) - a {@link CompactedException} will be
thrown.</li>
- * <li>get("some", 3) - a {@link CompactedException} will be
thrown.</li>
- * </ul>
- * </li>
+ * <li>{@link CompactedException} - if the requested entry was not
found and the {@code revUpperBound} is less than or equal to the
+ * last compacted one. For examples see {@link #getLocally(ByteArray,
long)}.</li>
* </ul>
*
- * @param key The key.
- * @param revUpperBound The upper bound of revision.
+ * @param key Key.
+ * @param revUpperBound Upper bound of revision (inclusive).
+ * @see #getLocally(ByteArray, long)
*/
CompletableFuture<Entry> get(ByteArray key, long revUpperBound);
@@ -216,53 +173,52 @@ public interface MetaStorageManager extends
IgniteComponent {
* <p>This method doesn't wait for the storage's revision to become
greater or equal to the revUpperBound parameter, so it is
* up to user to wait for the appropriate time to call this method.
*
- * <p>Let's consider examples of the work of the method and compaction of
the metastore. Let's assume that we have keys with revisions
- * "foo" [1, 2] and "bar" [1, 2 (tombstone)], and the key "some" has never
been in the metastore.</p>
+ * <p>Let's consider examples of the work of the method and compaction of
the metastorage. Let's assume that we have keys with revisions
+ * "foo" [1, 2] and "bar" [1, 2 (tombstone)], and the key "some" has never
been in the metastorage.</p>
* <ul>
* <li>Compaction revision is {@code 1}.
* <ul>
- * <li>get("foo", 1) - a {@link CompactedException} will be
thrown.</li>
- * <li>get("foo", 2) - will return a single value with revision
2.</li>
- * <li>get("foo", 3) - will return a single value with revision
2.</li>
- * <li>get("bar", 1) - a {@link CompactedException} will be
thrown.</li>
- * <li>get("bar", 2) - will return a single value with revision
2.</li>
- * <li>get("bar", 3) - will return a single value with revision
2.</li>
- * <li>get("some", 1) - a {@link CompactedException} will be
thrown.</li>
- * <li>get("some", 2) - will return an empty value.</li>
- * <li>get("some", 3) - will return an empty value.</li>
+ * <li>getLocally("foo", 1) - a {@link CompactedException} will be
thrown.</li>
+ * <li>getLocally("foo", 2) - will return a single value with
revision 2.</li>
+ * <li>getLocally("foo", 3) - will return a single value with
revision 2.</li>
+ * <li>getLocally("bar", 1) - a {@link CompactedException} will be
thrown.</li>
+ * <li>getLocally("bar", 2) - will return a single value with
revision 2.</li>
+ * <li>getLocally("bar", 3) - will return a single value with
revision 2.</li>
+ * <li>getLocally("some", 1) - a {@link CompactedException} will
be thrown.</li>
+ * <li>getLocally("some", 2) - will return an empty value.</li>
+ * <li>getLocally("some", 3) - will return an empty value.</li>
* </ul>
* </li>
* <li>Compaction revision is {@code 2}.
* <ul>
- * <li>get("foo", 1) - a {@link CompactedException} will be
thrown.</li>
- * <li>get("foo", 2) - will return a single value with revision
2.</li>
- * <li>get("foo", 3) - will return a single value with revision
2.</li>
- * <li>get("bar", 1) - a {@link CompactedException} will be
thrown.</li>
- * <li>get("bar", 2) - a {@link CompactedException} will be
thrown.</li>
- * <li>get("bar", 3) - will return a single value with revision
2.</li>
- * <li>get("some", 1) - a {@link CompactedException} will be
thrown.</li>
- * <li>get("some", 2) - a {@link CompactedException} will be
thrown.</li>
- * <li>get("some", 3) - will return an empty value.</li>
+ * <li>getLocally("foo", 1) - a {@link CompactedException} will be
thrown.</li>
+ * <li>getLocally("foo", 2) - will return a single value with
revision 2.</li>
+ * <li>getLocally("foo", 3) - will return a single value with
revision 2.</li>
+ * <li>getLocally("bar", 1) - a {@link CompactedException} will be
thrown.</li>
+ * <li>getLocally("bar", 2) - a {@link CompactedException} will be
thrown.</li>
+ * <li>getLocally("bar", 3) - will return a single value with
revision 2.</li>
+ * <li>getLocally("some", 1) - a {@link CompactedException} will
be thrown.</li>
+ * <li>getLocally("some", 2) - a {@link CompactedException} will
be thrown.</li>
+ * <li>getLocally("some", 3) - will return an empty value.</li>
* </ul>
* </li>
* <li>Compaction revision is {@code 3}.
* <ul>
- * <li>get("foo", 1) - a {@link CompactedException} will be
thrown.</li>
- * <li>get("foo", 2) - will return a single value with revision
2.</li>
- * <li>get("foo", 3) - will return a single value with revision
2.</li>
- * <li>get("bar", 1) - a {@link CompactedException} will be
thrown.</li>
- * <li>get("bar", 2) - a {@link CompactedException} will be
thrown.</li>
- * <li>get("bar", 3) - a {@link CompactedException} will be
thrown.</li>
- * <li>get("some", 1) - a {@link CompactedException} will be
thrown.</li>
- * <li>get("some", 2) - a {@link CompactedException} will be
thrown.</li>
- * <li>get("some", 3) - a {@link CompactedException} will be
thrown.</li>
+ * <li>getLocally("foo", 1) - a {@link CompactedException} will be
thrown.</li>
+ * <li>getLocally("foo", 2) - will return a single value with
revision 2.</li>
+ * <li>getLocally("foo", 3) - will return a single value with
revision 2.</li>
+ * <li>getLocally("bar", 1) - a {@link CompactedException} will be
thrown.</li>
+ * <li>getLocally("bar", 2) - a {@link CompactedException} will be
thrown.</li>
+ * <li>getLocally("bar", 3) - a {@link CompactedException} will be
thrown.</li>
+ * <li>getLocally("some", 1) - a {@link CompactedException} will
be thrown.</li>
+ * <li>getLocally("some", 2) - a {@link CompactedException} will
be thrown.</li>
+ * <li>getLocally("some", 3) - a {@link CompactedException} will
be thrown.</li>
* </ul>
* </li>
* </ul>
*
- * @param key The key.
- * @param revUpperBound The upper bound of revision.
- * @return Value corresponding to the given key.
+ * @param key Key.
+ * @param revUpperBound Upper bound of revision (inclusive).
* @throws IgniteInternalException with cause {@link
NodeStoppingException} if the node is in the process of stopping.
* @throws CompactedException If the requested entry was not found and the
{@code revUpperBound} is less than or equal to the last
* compacted one.
@@ -270,29 +226,37 @@ public interface MetaStorageManager extends
IgniteComponent {
Entry getLocally(ByteArray key, long revUpperBound);
/**
- * Returns cursor by entries which correspond to the given keys range and
bounded by revision number. The entries in the cursor
- * are obtained from the local storage.
+ * Returns cursor by entries which correspond to the given keys range and
bounded by revision number locally.
+ *
+ * <p>Cursor will iterate over a snapshot of keys and their revisions at
the time the method was invoked.</p>
+ *
+ * <p>Cursor methods never throw {@link CompactedException}.</p>
*
* <p>This method doesn't wait for the storage's revision to become
greater or equal to the revUpperBound parameter, so it is
- * up to user to wait for the appropriate time to call this method.
+ * up to user to wait for the appropriate time to call this method.</p>
*
* @param startKey Start key of range (inclusive).
- * @param endKey Last key of range (exclusive).
- * @param revUpperBound Upper bound of revision.
- * @return Cursor by entries which correspond to the given keys range.
+ * @param endKey Last key of range (exclusive), {@code null} represents an
unbound range.
+ * @param revUpperBound Upper bound of revision (inclusive) for each key.
+ * @throws IgniteInternalException with cause {@link
NodeStoppingException} if the node is in the process of stopping.
+ * @throws CompactedException If the {@code revUpperBound} is less than or
equal to the last compacted one.
*/
- Cursor<Entry> getLocally(ByteArray startKey, ByteArray endKey, long
revUpperBound);
+ Cursor<Entry> getLocally(ByteArray startKey, @Nullable ByteArray endKey,
long revUpperBound);
/**
- * Returns cursor by entries which correspond to the given key prefix and
bounded by revision number. The entries in the cursor
- * are obtained from the local storage.
+ * Returns cursor by entries which correspond to the given key prefix and
bounded by revision number locally.
+ *
+ * <p>Cursor will iterate over a snapshot of keys and their revisions at
the time the method was invoked.</p>
+ *
+ * <p>Cursor methods never throw {@link CompactedException}.</p>
*
* <p>This method doesn't wait for the storage's revision to become
greater or equal to the revUpperBound parameter, so it is
- * up to user to wait for the appropriate time to call this method.
+ * up to user to wait for the appropriate time to call this method.</p>
*
* @param keyPrefix Key prefix.
- * @param revUpperBound Upper bound of revision.
- * @return Cursor by entries which correspond to the given key prefix.
+ * @param revUpperBound Upper bound of revision (inclusive) for each key.
+ * @throws IgniteInternalException with cause {@link
NodeStoppingException} if the node is in the process of stopping.
+ * @throws CompactedException If the {@code revUpperBound} is less than or
equal to the last compacted one.
*/
Cursor<Entry> prefixLocally(ByteArray keyPrefix, long revUpperBound);
@@ -310,7 +274,7 @@ public interface MetaStorageManager extends IgniteComponent
{
HybridTimestamp timestampByRevisionLocally(long revision);
/**
- * Returns a future of getting the latest version of entries corresponding
to the given keys from the metastore leader.
+ * Returns a future of getting the latest version of entries corresponding
to the given keys from the metastorage leader.
*
* <p>Never completes with a {@link CompactedException}.</p>
*
@@ -341,48 +305,49 @@ public interface MetaStorageManager extends
IgniteComponent {
CompletableFuture<Void> removeAll(Set<ByteArray> keys);
/**
- * Retrieves entries for the given key prefix in lexicographic order.
Shortcut for {@link #prefix(ByteArray, long)} where
- * {@code revUpperBound = LATEST_REVISION}.
+ * Returns a publisher for getting the latest version of an entries for
the given key prefix from the metastorage leader.
*
- * @param keyPrefix Prefix of the key to retrieve the entries. Couldn't be
{@code null}.
- * @return Publisher that will provide entries corresponding to the given
prefix. This Publisher may also fail (by calling
- * {@link Subscriber#onError}) with one of the following exceptions:
- * <ul>
- * <li>{@link OperationTimeoutException} - if the operation is
timed out;</li>
- * <li>{@link CompactedException} - if the desired revisions are
removed from the storage due to a compaction;</li>
- * <li>{@link NodeStoppingException} - if this node has been
stopped.</li>
- * </ul>
+ * <p>Never fail with a {@link CompactedException}.</p>
+ *
+ * <p>Publisher may fail (by calling {@link Subscriber#onError}) with one
of the following exceptions:</p>
+ * <ul>
+ * <li>{@link NodeStoppingException} - if the node is in the process
of stopping.</li>
+ * <li>{@link OperationTimeoutException} - if the operation is timed
out.</li>
+ * </ul>
+ *
+ * @param keyPrefix Key prefix.
*/
Publisher<Entry> prefix(ByteArray keyPrefix);
/**
- * Retrieves entries for the given key prefix in lexicographic order.
Entries will be filtered out by upper bound of given revision
- * number.
+ * Returns a publisher for getting an entries for the given key prefix and
the revision upper bound from the metastorage leader.
*
- * @param keyPrefix Prefix of the key to retrieve the entries. Couldn't be
{@code null}.
- * @param revUpperBound The upper bound for entry revision or {@link
MetaStorageManager#LATEST_REVISION} for no revision bound.
- * @return Publisher that will provide entries corresponding to the given
prefix and revision. This Publisher may also fail (by calling
- * {@link Subscriber#onError}) with one of the following exceptions:
- * <ul>
- * <li>{@link OperationTimeoutException} - if the operation is
timed out;</li>
- * <li>{@link CompactedException} - if the desired revisions are
removed from the storage due to a compaction;</li>
- * <li>{@link NodeStoppingException} - if this node has been
stopped.</li>
- * </ul>
+ * <p>Publisher may fail (by calling {@link Subscriber#onError}) with one
of the following exceptions:</p>
+ * <ul>
+ * <li>{@link NodeStoppingException} - if the node is in the process
of stopping.</li>
+ * <li>{@link OperationTimeoutException} - if the operation is timed
out.</li>
+ * <li>{@link CompactedException} - if the {@code revUpperBound} is
less than or equal to the last compacted one on metastorage
+ * leader, can occur while processing any batch of entries.</li>
+ * </ul>
+ *
+ * @param keyPrefix Key prefix.
+ * @param revUpperBound Upper bound of revision (inclusive) for each key.
*/
Publisher<Entry> prefix(ByteArray keyPrefix, long revUpperBound);
/**
- * Retrieves entries for the given key range in lexicographic order.
+ * Returns a publisher for getting the latest version of an entries for
the given keys range from the metastorage leader.
*
- * @param keyFrom Range lower bound (inclusive).
- * @param keyTo Range upper bound (exclusive), {@code null} represents an
unbound range.
- * @return Publisher that will provide entries corresponding to the given
range. This Publisher may also fail (by calling
- * {@link Subscriber#onError}) with one of the following exceptions:
- * <ul>
- * <li>{@link OperationTimeoutException} - if the operation is
timed out;</li>
- * <li>{@link CompactedException} - if the desired revisions are
removed from the storage due to a compaction;</li>
- * <li>{@link NodeStoppingException} - if this node has been
stopped.</li>
- * </ul>
+ * <p>Never fail with a {@link CompactedException}.</p>
+ *
+ * <p>Publisher may fail (by calling {@link Subscriber#onError}) with one
of the following exceptions:</p>
+ * <ul>
+ * <li>{@link NodeStoppingException} - if the node is in the process
of stopping.</li>
+ * <li>{@link OperationTimeoutException} - if the operation is timed
out.</li>
+ * </ul>
+ *
+ * @param keyFrom Start key of range (inclusive).
+ * @param keyTo Last key of range (exclusive), {@code null} represents an
unbound range.
*/
Publisher<Entry> range(ByteArray keyFrom, @Nullable ByteArray keyTo);
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index fee2698fcf..8b18fd6d9d 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -764,15 +764,17 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
@Override
public Cursor<Entry> getLocally(ByteArray startKey, ByteArray endKey, long
revUpperBound) {
- return storage.range(startKey.bytes(), endKey.bytes(), revUpperBound);
+ return inBusyLock(busyLock, () -> storage.range(startKey.bytes(),
endKey == null ? null : endKey.bytes(), revUpperBound));
}
@Override
public Cursor<Entry> prefixLocally(ByteArray keyPrefix, long
revUpperBound) {
- byte[] rangeStart = keyPrefix.bytes();
- byte[] rangeEnd = storage.nextKey(rangeStart);
+ return inBusyLock(busyLock, () -> {
+ byte[] rangeStart = keyPrefix.bytes();
+ byte[] rangeEnd = storage.nextKey(rangeStart);
- return storage.range(rangeStart, rangeEnd, revUpperBound);
+ return storage.range(rangeStart, rangeEnd, revUpperBound);
+ });
}
@Override
@@ -916,21 +918,12 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
@Override
public Publisher<Entry> range(ByteArray keyFrom, @Nullable ByteArray
keyTo) {
- return range(keyFrom, keyTo, false);
- }
-
- /**
- * Retrieves entries for the given key range in lexicographic order.
- *
- * @see MetaStorageService#range(ByteArray, ByteArray, boolean)
- */
- public Publisher<Entry> range(ByteArray keyFrom, @Nullable ByteArray
keyTo, boolean includeTombstones) {
if (!busyLock.enterBusy()) {
return new NodeStoppingPublisher<>();
}
try {
- return new
CompletableFuturePublisher<>(metaStorageSvcFut.thenApply(svc ->
svc.range(keyFrom, keyTo, includeTombstones)));
+ return new
CompletableFuturePublisher<>(metaStorageSvcFut.thenApply(svc ->
svc.range(keyFrom, keyTo, false)));
} finally {
busyLock.leaveBusy();
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index 0120df0c88..ccbaa1d580 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -67,8 +67,8 @@ public interface KeyValueStorage extends ManuallyCloseable {
/**
* Returns an entry by the given key and bounded by the given revision.
*
- * <p>Let's consider examples of the work of the method and compaction of
the metastore. Let's assume that we have keys with revisions
- * "foo" [1, 2] and "bar" [1, 2 (tombstone)], and the key "some" has never
been in the metastore.</p>
+ * <p>Let's consider examples of the work of the method and compaction of
the metastorage. Let's assume that we have keys with
+ * revisions "foo" [1, 2] and "bar" [1, 2 (tombstone)], and the key "some"
has never been in the metastorage.</p>
* <ul>
* <li>Compaction revision is {@code 1}.
* <ul>
@@ -112,7 +112,7 @@ public interface KeyValueStorage extends ManuallyCloseable {
* </ul>
*
* @param key Key.
- * @param revUpperBound The upper bound of revision.
+ * @param revUpperBound Upper bound of revision (inclusive).
* @throws CompactedException If the requested entry was not found and the
{@code revUpperBound} is less than or equal to the last
* {@link #setCompactionRevision compacted} one.
*/
@@ -213,7 +213,7 @@ public interface KeyValueStorage extends ManuallyCloseable {
* Returns entries corresponding to the given keys and bounded by the
given revision.
*
* @param keys Not empty keys.
- * @param revUpperBound Upper bound of revision.
+ * @param revUpperBound Upper bound of revision (inclusive).
* @throws CompactedException If getting any of the individual entries
would have thrown this exception as if
* {@link #get(byte[], long)} was used.
* @see #get(byte[], long)
@@ -285,21 +285,29 @@ public interface KeyValueStorage extends
ManuallyCloseable {
StatementResult invoke(If iif, HybridTimestamp opTs, CommandId commandId);
/**
- * Returns cursor by entries which correspond to the given keys range.
+ * Returns cursor by latest entries which correspond to the given keys
range.
+ *
+ * <p>Cursor will iterate over a snapshot of keys and their revisions at
the time the method was invoked.</p>
+ *
+ * <p>Never throws {@link CompactedException} as well as cursor
methods.</p>
*
* @param keyFrom Start key of range (inclusive).
- * @param keyTo Last key of range (exclusive).
- * @return Cursor by entries which correspond to the given keys range.
+ * @param keyTo Last key of range (exclusive), {@code null} represents an
unbound range.
*/
Cursor<Entry> range(byte[] keyFrom, byte @Nullable [] keyTo);
/**
* Returns cursor by entries which correspond to the given keys range and
bounded by revision number.
*
+ * <p>Cursor will iterate over a snapshot of keys and their revisions at
the time the method was invoked.</p>
+ *
+ * <p>Cursor methods never throw {@link CompactedException}.</p>
+ *
* @param keyFrom Start key of range (inclusive).
- * @param keyTo Last key of range (exclusive).
- * @param revUpperBound Upper bound of revision.
- * @return Cursor by entries which correspond to the given keys range.
+ * @param keyTo Last key of range (exclusive), {@code null} represents an
unbound range.
+ * @param revUpperBound Upper bound of revision (inclusive) for each key.
+ * @throws CompactedException If the {@code revUpperBound} is less than or
equal to the last {@link #setCompactionRevision compacted}
+ * one.
*/
Cursor<Entry> range(byte[] keyFrom, byte @Nullable [] keyTo, long
revUpperBound);
@@ -307,7 +315,7 @@ public interface KeyValueStorage extends ManuallyCloseable {
* Creates subscription on updates of entries corresponding to the given
keys range and starting from the given revision number.
*
* @param keyFrom Start key of range (inclusive).
- * @param keyTo Last key of range (exclusive).
+ * @param keyTo Last key of range (exclusive), {@code null} represents an
unbound range.
* @param rev Start revision number.
*/
void watchRange(byte[] keyFrom, byte @Nullable [] keyTo, long rev,
WatchListener listener);
@@ -315,8 +323,8 @@ public interface KeyValueStorage extends ManuallyCloseable {
/**
* Registers a watch listener for the provided key.
*
- * @param key Meta Storage key.
- * @param rev Starting Meta Storage revision.
+ * @param key Key.
+ * @param rev Start revision number.
* @param listener Listener which will be notified for each update.
*/
void watchExact(byte[] key, long rev, WatchListener listener);
@@ -324,8 +332,8 @@ public interface KeyValueStorage extends ManuallyCloseable {
/**
* Registers a watch listener for the provided keys.
*
- * @param keys Meta Storage keys.
- * @param rev Starting Meta Storage revision.
+ * @param keys Not empty keys.
+ * @param rev Start revision number.
* @param listener Listener which will be notified for each update.
*/
void watchExact(Collection<byte[]> keys, long rev, WatchListener listener);
@@ -333,7 +341,7 @@ public interface KeyValueStorage extends ManuallyCloseable {
/**
* Starts all registered watches.
*
- * <p>Before calling this method, watches will not receive any updates.
+ * <p>Before calling this method, watches will not receive any updates.</p>
*
* @param startRevision Revision to start processing updates from.
* @param revisionCallback Callback that will be invoked after all watches
of a particular revision are processed, with the
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorageUtils.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorageUtils.java
index 705907e08a..f7aaab1171 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorageUtils.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorageUtils.java
@@ -21,6 +21,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Arrays.binarySearch;
import java.util.function.LongPredicate;
+import org.jetbrains.annotations.Nullable;
/** Helper class with useful methods and constants for {@link KeyValueStorage}
implementations. */
public class KeyValueStorageUtils {
@@ -110,8 +111,8 @@ public class KeyValueStorageUtils {
*
* @param bytes Bytes.
*/
- public static String toUtf8String(byte[] bytes) {
- return new String(bytes, UTF_8);
+ public static String toUtf8String(byte @Nullable [] bytes) {
+ return bytes == null ? "null" : new String(bytes, UTF_8);
}
/** Asserts that the compaction revision is less than the current storage
revision. */
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index d288d7b6bd..5e792e495f 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -866,7 +866,7 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
rwLock.readLock().lock();
try {
- return range(keyFrom, keyTo, rev);
+ return doRange(keyFrom, keyTo, rev);
} finally {
rwLock.readLock().unlock();
}
@@ -877,86 +877,7 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
rwLock.readLock().lock();
try {
- var readOpts = new ReadOptions();
-
- var upperBound = keyTo == null ? null : new Slice(keyTo);
-
- readOpts.setIterateUpperBound(upperBound);
-
- RocksIterator iterator = index.newIterator(readOpts);
-
- iterator.seek(keyFrom);
-
- return new RocksIteratorAdapter<>(iterator) {
- /** Cached entry used to filter "empty" values. */
- @Nullable
- private Entry next;
-
- @Override
- public boolean hasNext() {
- if (next != null) {
- return true;
- }
-
- while (next == null && super.hasNext()) {
- Entry nextCandidate = decodeEntry(it.key(),
it.value());
-
- it.next();
-
- if (!nextCandidate.empty()) {
- next = nextCandidate;
-
- return true;
- }
- }
-
- return false;
- }
-
- @Override
- public Entry next() {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
-
- Entry result = next;
-
- assert result != null;
-
- next = null;
-
- return result;
- }
-
- @Override
- protected Entry decodeEntry(byte[] key, byte[] value) {
- long[] revisions = getAsLongs(value);
-
- long targetRevision = maxRevision(revisions,
revUpperBound);
-
- if (targetRevision == -1) {
- return EntryImpl.empty(key);
- }
-
- // This is not a correct approach for using locks in terms
of compaction correctness (we should block compaction for the
- // whole iteration duration). However, compaction is not
fully implemented yet, so this lock is taken for consistency
- // sake. This part must be rewritten in the future.
- rwLock.readLock().lock();
-
- try {
- return doGetValue(key, targetRevision);
- } finally {
- rwLock.readLock().unlock();
- }
- }
-
- @Override
- public void close() {
- super.close();
-
- RocksUtils.closeAll(readOpts, upperBound);
- }
- };
+ return doRange(keyFrom, keyTo, revUpperBound);
} finally {
rwLock.readLock().unlock();
}
@@ -1236,59 +1157,6 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
}
}
- /**
- * Returns maximum revision which must be less or equal to {@code
upperBoundRev}. If there is no such revision then {@code -1} will be
- * returned.
- *
- * @param revs Revisions list.
- * @param upperBoundRev Revision upper bound.
- * @return Maximum revision or {@code -1} if there is no such revision.
- */
- private static long maxRevision(long[] revs, long upperBoundRev) {
- for (int i = revs.length - 1; i >= 0; i--) {
- long rev = revs[i];
-
- if (rev <= upperBoundRev) {
- return rev;
- }
- }
-
- return -1;
- }
-
- /**
- * Gets the value by a key and a revision.
- *
- * @param key Target key.
- * @param revision Target revision.
- * @return Entry.
- */
- private Entry doGetValue(byte[] key, long revision) {
- if (revision == 0) {
- return EntryImpl.empty(key);
- }
-
- byte[] valueBytes;
-
- try {
- valueBytes = data.get(keyToRocksKey(revision, key));
- } catch (RocksDBException e) {
- throw new MetaStorageException(OP_EXECUTION_ERR, e);
- }
-
- if (valueBytes == null || valueBytes.length == 0) {
- return EntryImpl.empty(key);
- }
-
- Value lastVal = bytesToValue(valueBytes);
-
- if (lastVal.tombstone()) {
- return EntryImpl.tombstone(key, revision,
lastVal.operationTimestamp());
- }
-
- return new EntryImpl(key, lastVal.bytes(), revision,
lastVal.operationTimestamp());
- }
-
/**
* Adds an entry to the batch.
*
@@ -1767,4 +1635,87 @@ public class RocksDbKeyValueStorage implements
KeyValueStorage {
);
}
}
+
+ private Cursor<Entry> doRange(byte[] keyFrom, byte @Nullable [] keyTo,
long revUpperBound) {
+ assert revUpperBound >= 0 : revUpperBound;
+
+
CompactedException.throwIfRequestedRevisionLessThanOrEqualToCompacted(revUpperBound,
compactionRevision);
+
+ var readOpts = new ReadOptions();
+
+ Slice upperBound = keyTo == null ? null : new Slice(keyTo);
+
+ readOpts.setIterateUpperBound(upperBound);
+
+ RocksIterator iterator = index.newIterator(readOpts);
+
+ iterator.seek(keyFrom);
+
+ return new RocksIteratorAdapter<>(iterator) {
+ /** Cached entry used to filter "empty" values. */
+ private @Nullable Entry next;
+
+ @Override
+ public boolean hasNext() {
+ if (next != null) {
+ return true;
+ }
+
+ while (next == null && super.hasNext()) {
+ Entry nextCandidate = decodeEntry(it.key(), it.value());
+
+ it.next();
+
+ if (!nextCandidate.empty()) {
+ next = nextCandidate;
+
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public Entry next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ Entry result = next;
+
+ assert result != null;
+
+ next = null;
+
+ return result;
+ }
+
+ @Override
+ protected Entry decodeEntry(byte[] key, byte[] keyRevisionsBytes) {
+ long[] keyRevisions = getAsLongs(keyRevisionsBytes);
+
+ int maxRevisionIndex = maxRevisionIndex(keyRevisions,
revUpperBound);
+
+ if (maxRevisionIndex == NOT_FOUND) {
+ return EntryImpl.empty(key);
+ }
+
+ long revision = keyRevisions[maxRevisionIndex];
+
+ // According to the compaction algorithm, we will start it
locally on a new compaction revision only when all cursors are
+ // completed strictly before it. Therefore, during normal
operation, we should not get an error here.
+ Value value = getValueForOperation(key, revision);
+
+ return EntryImpl.toEntry(key, revision, value);
+ }
+
+ @Override
+ public void close() {
+ super.close();
+
+ RocksUtils.closeAll(readOpts, upperBound);
+ }
+ };
+ }
}
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
index c3e10a8316..66324d84ce 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
@@ -45,6 +45,7 @@ import
org.apache.ignite.internal.metastorage.impl.CommandIdGenerator;
import org.apache.ignite.internal.metastorage.server.ExistenceCondition.Type;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.Cursor;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -688,6 +689,39 @@ public abstract class
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
assertDoesNotThrowsCompactedExceptionForGetList(NOT_EXISTS_KEY, 7, 7);
}
+ @Test
+ void testRangeLatestAndCompaction() {
+ storage.setCompactionRevision(6);
+
+ assertDoesNotThrow(() -> {
+ try (Cursor<Entry> cursor = storage.range(FOO_KEY, null)) {
+ cursor.hasNext();
+ cursor.next();
+ }
+ });
+ }
+
+ @Test
+ void testRangeAndCompaction() {
+ try (Cursor<Entry> cursorBeforeSetCompactionRevision =
storage.range(FOO_KEY, null, 5)) {
+ storage.setCompactionRevision(5);
+
+ assertThrows(CompactedException.class, () ->
storage.range(FOO_KEY, null, 1));
+ assertThrows(CompactedException.class, () ->
storage.range(FOO_KEY, null, 3));
+ assertThrows(CompactedException.class, () ->
storage.range(FOO_KEY, null, 5));
+
+ assertDoesNotThrow(() -> {
+ try (Cursor<Entry> range = storage.range(FOO_KEY, null, 6)) {
+ range.hasNext();
+ range.next();
+ }
+ });
+
+ assertDoesNotThrow(cursorBeforeSetCompactionRevision::hasNext);
+ assertDoesNotThrow(cursorBeforeSetCompactionRevision::next);
+ }
+ }
+
private List<Integer> collectRevisions(byte[] key) {
var revisions = new ArrayList<Integer>();
diff --git
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index 06bb3e10c0..7d379dedb6 100644
---
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -386,29 +386,14 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
@Override
public Cursor<Entry> range(byte[] keyFrom, byte @Nullable [] keyTo) {
synchronized (mux) {
- return range(keyFrom, keyTo, rev);
+ return doRange(keyFrom, keyTo, rev);
}
}
@Override
public Cursor<Entry> range(byte[] keyFrom, byte @Nullable [] keyTo, long
revUpperBound) {
synchronized (mux) {
- SortedMap<byte[], List<Long>> subMap = keyTo == null
- ? keysIdx.tailMap(keyFrom)
- : keysIdx.subMap(keyFrom, keyTo);
-
- return subMap.entrySet().stream()
- .map(e -> {
- long targetRevision = maxRevision(e.getValue(),
revUpperBound);
-
- if (targetRevision == -1) {
- return EntryImpl.empty(e.getKey());
- }
-
- return doGetValue(e.getKey(), targetRevision);
- })
- .filter(e -> !e.empty())
- .collect(collectingAndThen(toList(),
Cursor::fromIterable));
+ return doRange(keyFrom, keyTo, revUpperBound);
}
}
@@ -529,8 +514,10 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
return;
}
- HybridTimestamp ts = revToTsMap.get(updatedEntries.get(0).revision());
- assert ts != null;
+ long revision = updatedEntries.get(0).revision();
+
+ HybridTimestamp ts = revToTsMap.get(revision);
+ assert ts != null : revision;
watchProcessor.notifyWatches(List.copyOf(updatedEntries), ts);
@@ -785,48 +772,6 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
return entries;
}
- /**
- * Returns maximum revision which must be less or equal to {@code
upperBoundRev}. If there is no such revision then {@code -1} will be
- * returned.
- *
- * @param revs Revisions list.
- * @param upperBoundRev Revision upper bound.
- * @return Appropriate revision or {@code -1} if there is no such revision.
- */
- private static long maxRevision(List<Long> revs, long upperBoundRev) {
- int i = revs.size() - 1;
-
- for (; i >= 0; i--) {
- long rev = revs.get(i);
-
- if (rev <= upperBoundRev) {
- return rev;
- }
- }
-
- return -1;
- }
-
- private Entry doGetValue(byte[] key, long lastRev) {
- if (lastRev == 0) {
- return EntryImpl.empty(key);
- }
-
- NavigableMap<byte[], Value> lastRevVals = revsIdx.get(lastRev);
-
- if (lastRevVals == null || lastRevVals.isEmpty()) {
- return EntryImpl.empty(key);
- }
-
- Value lastVal = lastRevVals.get(key);
-
- if (lastVal.tombstone()) {
- return EntryImpl.tombstone(key, lastRev,
lastVal.operationTimestamp());
- }
-
- return new EntryImpl(key, lastVal.bytes(), lastRev,
lastVal.operationTimestamp());
- }
-
private void doPut(byte[] key, byte[] bytes, long curRev, HybridTimestamp
opTs) {
// Update keysIdx.
List<Long> revs = keysIdx.computeIfAbsent(key, k -> new ArrayList<>());
@@ -983,4 +928,33 @@ public class SimpleInMemoryKeyValueStorage implements
KeyValueStorage {
return value;
}
+
+ private Cursor<Entry> doRange(byte[] keyFrom, byte @Nullable [] keyTo,
long revUpperBound) {
+ assert revUpperBound >= 0 : revUpperBound;
+
+
CompactedException.throwIfRequestedRevisionLessThanOrEqualToCompacted(revUpperBound,
compactionRevision);
+
+ SortedMap<byte[], List<Long>> subMap = keyTo == null
+ ? keysIdx.tailMap(keyFrom)
+ : keysIdx.subMap(keyFrom, keyTo);
+
+ return subMap.entrySet().stream()
+ .map(e -> {
+ byte[] key = e.getKey();
+ long[] keyRevisions = toLongArray(e.getValue());
+
+ int maxRevisionIndex =
KeyValueStorageUtils.maxRevisionIndex(keyRevisions, revUpperBound);
+
+ if (maxRevisionIndex == NOT_FOUND) {
+ return EntryImpl.empty(key);
+ }
+
+ long revision = keyRevisions[maxRevisionIndex];
+ Value value = getValue(key, revision);
+
+ return EntryImpl.toEntry(key, revision, value);
+ })
+ .filter(e -> !e.empty())
+ .collect(collectingAndThen(toList(), Cursor::fromIterable));
+ }
}