This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new c3f3b6a4ce IGNITE-23417 Fix race between compaction and 
KeyValueStorage#range (#4550)
c3f3b6a4ce is described below

commit c3f3b6a4ce6b6a7e11da44fc98ccd7a8e6e57876
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Mon Oct 14 16:38:19 2024 +0300

    IGNITE-23417 Fix race between compaction and KeyValueStorage#range (#4550)
---
 .../metastorage/server/KeyValueStorage.java        |  6 +-
 .../server/persistence/RocksDbKeyValueStorage.java | 21 +++++--
 .../AbstractCompactionKeyValueStorageTest.java     | 69 ++++++++++++++++++++++
 .../server/SimpleInMemoryKeyValueStorage.java      | 23 +++++---
 4 files changed, 106 insertions(+), 13 deletions(-)

diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index ccbaa1d580..92432367e0 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -287,7 +287,8 @@ public interface KeyValueStorage extends ManuallyCloseable {
     /**
      * Returns cursor by latest entries which correspond to the given keys 
range.
      *
-     * <p>Cursor will iterate over a snapshot of keys and their revisions at 
the time the method was invoked.</p>
+     * <p>Cursor will iterate over a snapshot of keys and their revisions at 
the time the method was invoked. Also, each entry will be the
+     * only one with the most recent revision.</p>
      *
      * <p>Never throws {@link CompactedException} as well as cursor 
methods.</p>
      *
@@ -299,7 +300,8 @@ public interface KeyValueStorage extends ManuallyCloseable {
     /**
      * Returns cursor by entries which correspond to the given keys range and 
bounded by revision number.
      *
-     * <p>Cursor will iterate over a snapshot of keys and their revisions at 
the time the method was invoked.</p>
+     * <p>Cursor will iterate over a snapshot of keys and their revisions at 
the time the method was invoked. And also each record will be
+     * one and with a revision less than or equal to the {@code 
revUpperBound}.</p>
      *
      * <p>Cursor methods never throw {@link CompactedException}.</p>
      *
diff --git 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
index 5e792e495f..bf743489de 100644
--- 
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
+++ 
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/persistence/RocksDbKeyValueStorage.java
@@ -111,6 +111,7 @@ import 
org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
 import org.apache.ignite.internal.rocksdb.RocksUtils;
 import org.apache.ignite.internal.rocksdb.snapshot.RocksSnapshotManager;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
+import org.apache.ignite.internal.util.ArrayUtils;
 import org.apache.ignite.internal.util.ByteUtils;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.IgniteUtils;
@@ -1621,12 +1622,20 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
     }
 
     private Value getValueForOperation(byte[] key, long revision) {
+        Value value = getValueForOperationNullable(key, revision);
+
+        assert value != null : "key=" + toUtf8String(key) + ", revision=" + 
revision;
+
+        return value;
+    }
+
+    private @Nullable Value getValueForOperationNullable(byte[] key, long 
revision) {
         try {
             byte[] valueBytes = data.get(keyToRocksKey(revision, key));
 
             assert valueBytes != null && valueBytes.length != 0 : "key=" + 
toUtf8String(key) + ", revision=" + revision;
 
-            return bytesToValue(valueBytes);
+            return ArrayUtils.nullOrEmpty(valueBytes) ? null : 
bytesToValue(valueBytes);
         } catch (RocksDBException e) {
             throw new MetaStorageException(
                     OP_EXECUTION_ERR,
@@ -1651,6 +1660,8 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
 
         iterator.seek(keyFrom);
 
+        long compactionRevisionBeforeCreateCursor = compactionRevision;
+
         return new RocksIteratorAdapter<>(iterator) {
             /** Cached entry used to filter "empty" values. */
             private @Nullable Entry next;
@@ -1702,10 +1713,12 @@ public class RocksDbKeyValueStorage implements 
KeyValueStorage {
                 }
 
                 long revision = keyRevisions[maxRevisionIndex];
+                Value value = getValueForOperationNullable(key, revision);
 
-                // According to the compaction algorithm, we will start it 
locally on a new compaction revision only when all cursors are
-                // completed strictly before it. Therefore, during normal 
operation, we should not get an error here.
-                Value value = getValueForOperation(key, revision);
+                // Value may be null if the compaction has removed it in 
parallel.
+                if (value == null || (revision <= 
compactionRevisionBeforeCreateCursor && value.tombstone())) {
+                    return EntryImpl.empty(key);
+                }
 
                 return EntryImpl.toEntry(key, revision, value);
             }
diff --git 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
index 66324d84ce..956382ca5b 100644
--- 
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
+++ 
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractCompactionKeyValueStorageTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.metastorage.server;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.util.stream.Collectors.joining;
+import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.ops;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
@@ -722,6 +723,70 @@ public abstract class 
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
         }
     }
 
+    /**
+     * Tests {@link KeyValueStorage#range(byte[], byte[])} and {@link 
KeyValueStorage#range(byte[], byte[], long)} for the case when
+     * cursors should or should not return the last element after compaction. 
For {@link #FOO_KEY} and {@link #BAR_KEY}, the last revisions
+     * are 5, a regular entry and a tombstone. Keys with their revisions are 
added in {@link #setUp()}.
+     *
+     * <p>Consider the situation:</p>
+     * <ul>
+     *     <li>Made {@link KeyValueStorage#setCompactionRevision(long) 
KeyValueStorage.setCompactionRevision(5)}.</li>
+     *     <li>Waited for all cursors to end with revision {@code 5} or 
less.</li>
+     *     <li>Made {@link KeyValueStorage#compact(long) 
KeyValueStorage.compact(5)}.</li>
+     *     <li>Invoke {@link KeyValueStorage#range} for last revision and 
revision {@code 6} for {@link #FOO_KEY} and {@link #BAR_KEY}.
+     *     <ul>
+     *         <li>For {@link #FOO_KEY}, we need to return a entry with 
revision {@code 5}, since it will not be removed from the storage
+     *         after compaction.</li>
+     *         <li>For {@link #BAR_KEY}, we should not return anything, since 
the key will be deleted after compaction.</li>
+     *     </ul>
+     *     </li>
+     * </ul>
+     */
+    @Test
+    void testRangeAndCompactionForCaseReadLastEntries() {
+        storage.setCompactionRevision(5);
+
+        try (
+                Cursor<Entry> rangeFooKeyCursorLatest = storage.range(FOO_KEY, 
storage.nextKey(FOO_KEY));
+                Cursor<Entry> rangeFooKeyCursorBounded = 
storage.range(FOO_KEY, storage.nextKey(FOO_KEY), 6);
+                Cursor<Entry> rangeBarKeyCursorLatest = storage.range(BAR_KEY, 
storage.nextKey(BAR_KEY));
+                Cursor<Entry> rangeBarKeyCursorBounded = 
storage.range(BAR_KEY, storage.nextKey(BAR_KEY), 6)
+        ) {
+            // Must see the latest revision of the FOO_KEY as it will not be 
removed from storage by the compaction.
+            assertEquals(List.of(5L), 
collectRevisions(rangeFooKeyCursorLatest));
+            assertEquals(List.of(5L), 
collectRevisions(rangeFooKeyCursorBounded));
+
+            // Must not see the latest revision of the BAR_KEY, as it will 
have to be removed from the storage by the compaction.
+            assertEquals(List.of(), collectRevisions(rangeBarKeyCursorLatest));
+            assertEquals(List.of(), 
collectRevisions(rangeBarKeyCursorBounded));
+        }
+    }
+
+    /**
+     * Tests {@link KeyValueStorage#range(byte[], byte[])} and {@link 
KeyValueStorage#range(byte[], byte[], long)} for the case when they
+     * were invoked on a revision (for example, on revision {@code 5}) before 
invoking
+     * {@link KeyValueStorage#setCompactionRevision(long) 
KeyValueStorage.setCompactionRevision(5)} but before invoking
+     * {@link KeyValueStorage#compact(long) KeyValueStorage.compact(5)}. Such 
cursors should return entries since nothing should be
+     * removed yet until they are completed. Keys are chosen for convenience. 
Keys with their revisions are added in {@link #setUp()}.
+     */
+    @Test
+    void testRangeAfterSetCompactionRevisionButBeforeStartCompaction() {
+        try (
+                Cursor<Entry> rangeFooKeyCursorLatest = storage.range(FOO_KEY, 
storage.nextKey(FOO_KEY));
+                Cursor<Entry> rangeFooKeyCursorBounded = 
storage.range(FOO_KEY, storage.nextKey(FOO_KEY), 5);
+                Cursor<Entry> rangeBarKeyCursorLatest = storage.range(BAR_KEY, 
storage.nextKey(BAR_KEY));
+                Cursor<Entry> rangeBarKeyCursorBounded = 
storage.range(BAR_KEY, storage.nextKey(BAR_KEY), 5)
+        ) {
+            storage.setCompactionRevision(5);
+
+            assertEquals(List.of(5L), 
collectRevisions(rangeFooKeyCursorLatest));
+            assertEquals(List.of(5L), 
collectRevisions(rangeFooKeyCursorBounded));
+
+            assertEquals(List.of(5L), 
collectRevisions(rangeBarKeyCursorLatest));
+            assertEquals(List.of(5L), 
collectRevisions(rangeBarKeyCursorBounded));
+        }
+    }
+
     private List<Integer> collectRevisions(byte[] key) {
         var revisions = new ArrayList<Integer>();
 
@@ -736,6 +801,10 @@ public abstract class 
AbstractCompactionKeyValueStorageTest extends AbstractKeyV
         return revisions;
     }
 
+    private List<Long> collectRevisions(Cursor<Entry> cursor) {
+        return cursor.stream().map(Entry::revision).collect(toList());
+    }
+
     private static byte[] fromString(String s) {
         return s.getBytes(UTF_8);
     }
diff --git 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index 7d379dedb6..fad0672cb5 100644
--- 
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++ 
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -918,17 +918,21 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
     }
 
     private Value getValue(byte[] key, long revision) {
-        NavigableMap<byte[], Value> valueByKey = revsIdx.get(revision);
-
-        assert valueByKey != null : "key=" + toUtf8String(key) + ", revision=" 
+ revision;
-
-        Value value = valueByKey.get(key);
+        Value value = getValueNullable(key, revision);
 
         assert value != null : "key=" + toUtf8String(key) + ", revision=" + 
revision;
 
         return value;
     }
 
+    private @Nullable Value getValueNullable(byte[] key, long revision) {
+        NavigableMap<byte[], Value> valueByKey = revsIdx.get(revision);
+
+        assert valueByKey != null : "key=" + toUtf8String(key) + ", revision=" 
+ revision;
+
+        return valueByKey.get(key);
+    }
+
     private Cursor<Entry> doRange(byte[] keyFrom, byte @Nullable [] keyTo, 
long revUpperBound) {
         assert revUpperBound >= 0 : revUpperBound;
 
@@ -943,14 +947,19 @@ public class SimpleInMemoryKeyValueStorage implements 
KeyValueStorage {
                     byte[] key = e.getKey();
                     long[] keyRevisions = toLongArray(e.getValue());
 
-                    int maxRevisionIndex = 
KeyValueStorageUtils.maxRevisionIndex(keyRevisions, revUpperBound);
+                    int maxRevisionIndex = maxRevisionIndex(keyRevisions, 
revUpperBound);
 
                     if (maxRevisionIndex == NOT_FOUND) {
                         return EntryImpl.empty(key);
                     }
 
                     long revision = keyRevisions[maxRevisionIndex];
-                    Value value = getValue(key, revision);
+                    Value value = getValueNullable(key, revision);
+
+                    // Value may be null if the compaction has removed it in 
parallel.
+                    if (value == null || (revision <= compactionRevision && 
value.tombstone())) {
+                        return EntryImpl.empty(key);
+                    }
 
                     return EntryImpl.toEntry(key, revision, value);
                 })

Reply via email to