This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 1a81a07e0d IGNITE-18322 Define scan contract SortedIndexStorage (#1407)
1a81a07e0d is described below

commit 1a81a07e0da50a3d0ba7e1d8b3d911e106464f99
Author: Kirill Tkalenko <tkalkir...@yandex.ru>
AuthorDate: Mon Dec 12 12:28:42 2022 +0300

    IGNITE-18322 Define scan contract SortedIndexStorage (#1407)
---
 .../tree/AbstractBplusTreePageMemoryTest.java      |  24 +
 .../ignite/internal/pagememory/tree/BplusTree.java |  75 +++
 .../apache/ignite/internal/rocksdb/RocksUtils.java |   3 +
 .../index/AbstractSortedIndexStorageTest.java      | 713 ++++++++++++++++++++-
 .../index/impl/BinaryTupleRowSerializer.java       |   2 +-
 .../storage/index/impl/TestSortedIndexStorage.java | 198 ++++--
 .../index/sorted/PageMemorySortedIndexStorage.java | 100 ++-
 .../pagememory/index/sorted/SortedIndexTree.java   |   7 +
 .../rocksdb/index/RocksDbSortedIndexStorage.java   |  79 ++-
 9 files changed, 1106 insertions(+), 95 deletions(-)

diff --git 
a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/AbstractBplusTreePageMemoryTest.java
 
b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/AbstractBplusTreePageMemoryTest.java
index b9d7c03739..0983eda760 100644
--- 
a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/AbstractBplusTreePageMemoryTest.java
+++ 
b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/AbstractBplusTreePageMemoryTest.java
@@ -2333,6 +2333,30 @@ public abstract class AbstractBplusTreePageMemoryTest 
extends BaseIgniteAbstract
         }
     }
 
+    @Test
+    void testFindNext() throws Exception {
+        TestTree tree = createTestTree(true);
+
+        assertNull(tree.findNext(0L, false));
+        assertNull(tree.findNext(0L, true));
+
+        tree.put(0L);
+
+        assertNull(tree.findNext(0L, false));
+        assertEquals(0L, tree.findNext(0L, true));
+
+        tree.put(1L);
+
+        assertEquals(1L, tree.findNext(0L, false));
+        assertEquals(0L, tree.findNext(0L, true));
+
+        assertNull(tree.findNext(1L, false));
+        assertEquals(1L, tree.findNext(1L, true));
+
+        assertEquals(0L, tree.findNext(-1L, false));
+        assertEquals(0L, tree.findNext(-1L, true));
+    }
+
     private void doTestRandomPutRemoveMultithreaded(boolean canGetRow) throws 
Exception {
         final TestTree tree = createTestTree(canGetRow);
 
diff --git 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java
 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java
index 15214e04db..c601025f84 100644
--- 
a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java
+++ 
b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java
@@ -1572,6 +1572,34 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
         return findOne(row, null, null);
     }
 
+    /**
+     * Searches for the row that (strictly or loosely, depending on {@code 
includeRow}) follows the lowerBound passed as an argument.
+     *
+     * @param lowerBound Lower bound.
+     * @param includeRow {@code True} if you include the passed row in the 
result.
+     * @return Next row.
+     * @throws IgniteInternalCheckedException If failed.
+     */
+    public final @Nullable T findNext(L lowerBound, boolean includeRow) throws 
IgniteInternalCheckedException {
+        checkDestroyed();
+
+        GetNext g = new GetNext(lowerBound, includeRow);
+
+        try {
+            doFind(g);
+
+            return g.nextRow;
+        } catch (CorruptedDataStructureException e) {
+            throw e;
+        } catch (IgniteInternalCheckedException e) {
+            throw new IgniteInternalCheckedException("Runtime failure on 
lookup next row: " + lowerBound, e);
+        } catch (RuntimeException | AssertionError e) {
+            throw corruptedTreeException("Runtime failure on lookup next row: 
" + lowerBound, e, grpId, g.pageId);
+        } finally {
+            checkDestroyed();
+        }
+    }
+
     /**
      * Tries to find.
      *
@@ -6222,6 +6250,53 @@ public abstract class BplusTree<L, T extends L> extends 
DataStructure implements
         }
     }
 
+    /**
+     * Class for getting the next row.
+     */
+    private final class GetNext extends Get {
+        @Nullable
+        private T nextRow;
+
+        private GetNext(L row, boolean includeRow) {
+            super(row, false);
+
+            shift = includeRow ? -1 : 1;
+        }
+
+        @Override
+        boolean found(BplusIo<L> io, long pageAddr, int idx, int lvl) {
+            // Must never be called because we always have a shift.
+            throw new IllegalStateException();
+        }
+
+        @Override
+        boolean notFound(BplusIo<L> io, long pageAddr, int idx, int lvl) 
throws IgniteInternalCheckedException {
+            if (lvl != 0) {
+                return false;
+            }
+
+            int cnt = io.getCount(pageAddr);
+
+            if (cnt == 0) {
+                // Empty tree.
+                assert io.getForward(pageAddr, partId) == 0L;
+            } else {
+                assert io.isLeaf() : io;
+                assert cnt > 0 : cnt;
+                assert idx >= 0 : idx;
+                assert cnt >= idx : "cnt=" + cnt + ", idx=" + idx;
+
+                checkDestroyed();
+
+                if (idx < cnt) {
+                    nextRow = getRow(io, pageAddr, idx);
+                }
+            }
+
+            return true;
+        }
+    }
+
     /**
      * Page handler for basic {@link Get} operation.
      */
diff --git 
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksUtils.java
 
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksUtils.java
index 28a0a68b47..d6fa91a3f3 100644
--- 
a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksUtils.java
+++ 
b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksUtils.java
@@ -69,6 +69,9 @@ public class RocksUtils {
     /**
      * Checks the status of the iterator and throws an exception if it is not 
correct.
      *
+     * <p>Check the status first. This operation is guaranteed to throw if an 
internal error has occurred during the iteration. Otherwise,
+     * we've exhausted the data range.
+     *
      * @param it RocksDB iterator.
      * @throws IgniteInternalException if the iterator has an incorrect status.
      */
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
index 3b1d5ebc01..13c2f3123d 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.storage.index;
 
+import static java.util.function.Function.identity;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toUnmodifiableList;
 import static 
org.apache.ignite.internal.schema.testutils.SchemaConfigurationConverter.addIndex;
@@ -37,14 +38,20 @@ import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.hasSize;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.NoSuchElementException;
 import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
@@ -227,14 +234,14 @@ public abstract class AbstractSortedIndexStorageTest {
     }
 
     @Test
-    void testEmpty() throws Exception {
+    void testEmpty() {
         SortedIndexStorage index = 
createIndexStorage(shuffledRandomDefinitions());
 
         assertThat(scan(index, null, null, 0), is(empty()));
     }
 
     @Test
-    void testGet() throws Exception {
+    void testGet() {
         SortedIndexDefinition indexDefinition = 
SchemaBuilders.sortedIndex("TEST_INDEX")
                 .addIndexColumn(ColumnTypeSpec.STRING.name()).asc().done()
                 .addIndexColumn(ColumnTypeSpec.INT32.name()).asc().done()
@@ -267,7 +274,7 @@ public abstract class AbstractSortedIndexStorageTest {
      */
     @ParameterizedTest
     @VariableSource("ALL_TYPES_COLUMN_DEFINITIONS")
-    void testSingleColumnIndex(ColumnDefinition columnDefinition) throws 
Exception {
+    void testSingleColumnIndex(ColumnDefinition columnDefinition) {
         testPutGetRemove(List.of(columnDefinition));
     }
 
@@ -275,7 +282,7 @@ public abstract class AbstractSortedIndexStorageTest {
      * Tests that appending an already existing row does no harm.
      */
     @Test
-    void testPutIdempotence() throws Exception {
+    void testPutIdempotence() {
         SortedIndexDefinition indexDefinition = 
SchemaBuilders.sortedIndex("TEST_INDEX")
                 .addIndexColumn(ColumnTypeSpec.STRING.name()).asc().done()
                 .addIndexColumn(ColumnTypeSpec.INT32.name()).asc().done()
@@ -302,7 +309,7 @@ public abstract class AbstractSortedIndexStorageTest {
      * Tests that it is possible to add rows with the same columns but 
different Row IDs.
      */
     @Test
-    void testMultiplePuts() throws Exception {
+    void testMultiplePuts() {
         SortedIndexDefinition indexDefinition = 
SchemaBuilders.sortedIndex("TEST_INDEX")
                 .addIndexColumn(ColumnTypeSpec.STRING.name()).asc().done()
                 .addIndexColumn(ColumnTypeSpec.INT32.name()).asc().done()
@@ -335,7 +342,7 @@ public abstract class AbstractSortedIndexStorageTest {
      * Tests the {@link SortedIndexStorage#remove} method.
      */
     @Test
-    void testRemove() throws Exception {
+    void testRemove() {
         SortedIndexDefinition indexDefinition = 
SchemaBuilders.sortedIndex("TEST_INDEX")
                 .addIndexColumn(ColumnTypeSpec.STRING.name()).asc().done()
                 .addIndexColumn(ColumnTypeSpec.INT32.name()).asc().done()
@@ -389,7 +396,7 @@ public abstract class AbstractSortedIndexStorageTest {
      * Tests the Put-Get-Remove case when an index is created using all 
possible column in random order.
      */
     @RepeatedTest(5)
-    void testCreateMultiColumnIndex() throws Exception {
+    void testCreateMultiColumnIndex() {
         testPutGetRemove(shuffledDefinitions());
     }
 
@@ -397,7 +404,7 @@ public abstract class AbstractSortedIndexStorageTest {
      * Tests the happy case of the {@link SortedIndexStorage#scan} method.
      */
     @RepeatedTest(5)
-    void testScan() throws Exception {
+    void testScan() {
         SortedIndexStorage indexStorage = 
createIndexStorage(shuffledDefinitions());
 
         List<TestIndexRow> entries = IntStream.range(0, 10)
@@ -434,7 +441,7 @@ public abstract class AbstractSortedIndexStorageTest {
     }
 
     @Test
-    public void testBoundsAndOrder() throws Exception {
+    public void testBoundsAndOrder() {
         ColumnTypeSpec string = ColumnTypeSpec.STRING;
         ColumnTypeSpec int32 = ColumnTypeSpec.INT32;
 
@@ -525,7 +532,7 @@ public abstract class AbstractSortedIndexStorageTest {
      * Tests that an empty range is returned if {@link 
SortedIndexStorage#scan} method is called using overlapping keys.
      */
     @Test
-    void testEmptyRange() throws Exception {
+    void testEmptyRange() {
         List<ColumnDefinition> indexSchema = shuffledRandomDefinitions();
 
         SortedIndexStorage indexStorage = createIndexStorage(indexSchema);
@@ -549,7 +556,7 @@ public abstract class AbstractSortedIndexStorageTest {
 
     @ParameterizedTest
     @VariableSource("ALL_TYPES_COLUMN_DEFINITIONS")
-    void testNullValues(ColumnDefinition columnDefinition) throws Exception {
+    void testNullValues(ColumnDefinition columnDefinition) {
         SortedIndexStorage storage = 
createIndexStorage(List.of(columnDefinition));
 
         TestIndexRow entry1 = TestIndexRow.randomRow(storage);
@@ -579,6 +586,653 @@ public abstract class AbstractSortedIndexStorageTest {
         }
     }
 
+    /**
+     * Checks simple scenarios for a scanning cursor.
+     */
+    @Test
+    void testScanSimple() {
+        SortedIndexDefinition indexDefinition = 
SchemaBuilders.sortedIndex("TEST_IDX")
+                
.addIndexColumn(ColumnType.INT32.typeSpec().name()).asc().done()
+                .build();
+
+        SortedIndexStorage indexStorage = createIndexStorage(indexDefinition);
+
+        BinaryTupleRowSerializer serializer = new 
BinaryTupleRowSerializer(indexStorage.indexDescriptor());
+
+        for (int i = 0; i < 5; i++) {
+            put(indexStorage, serializer.serializeRow(new Object[]{i}, new 
RowId(TEST_PARTITION)));
+        }
+
+        // Checking without borders.
+        assertThat(
+                scan(indexStorage, null, null, 0, 
AbstractSortedIndexStorageTest::firstArrayElement),
+                contains(0, 1, 2, 3, 4)
+        );
+
+        // Let's check with borders.
+        assertThat(
+                scan(
+                        indexStorage,
+                        serializer.serializeRowPrefix(0),
+                        serializer.serializeRowPrefix(4),
+                        (GREATER_OR_EQUAL | LESS_OR_EQUAL),
+                        AbstractSortedIndexStorageTest::firstArrayElement
+                ),
+                contains(0, 1, 2, 3, 4)
+        );
+
+        assertThat(
+                scan(
+                        indexStorage,
+                        serializer.serializeRowPrefix(0),
+                        serializer.serializeRowPrefix(4),
+                        (GREATER_OR_EQUAL | LESS),
+                        AbstractSortedIndexStorageTest::firstArrayElement
+                ),
+                contains(0, 1, 2, 3)
+        );
+
+        assertThat(
+                scan(
+                        indexStorage,
+                        serializer.serializeRowPrefix(0),
+                        serializer.serializeRowPrefix(4),
+                        (GREATER | LESS_OR_EQUAL),
+                        AbstractSortedIndexStorageTest::firstArrayElement
+                ),
+                contains(1, 2, 3, 4)
+        );
+
+        assertThat(
+                scan(
+                        indexStorage,
+                        serializer.serializeRowPrefix(0),
+                        serializer.serializeRowPrefix(4),
+                        (GREATER | LESS),
+                        AbstractSortedIndexStorageTest::firstArrayElement
+                ),
+                contains(1, 2, 3)
+        );
+
+        // Let's check only with the lower bound.
+        assertThat(
+                scan(
+                        indexStorage,
+                        serializer.serializeRowPrefix(1),
+                        null,
+                        (GREATER_OR_EQUAL | LESS_OR_EQUAL),
+                        AbstractSortedIndexStorageTest::firstArrayElement
+                ),
+                contains(1, 2, 3, 4)
+        );
+
+        assertThat(
+                scan(
+                        indexStorage,
+                        serializer.serializeRowPrefix(1),
+                        null,
+                        (GREATER_OR_EQUAL | LESS),
+                        AbstractSortedIndexStorageTest::firstArrayElement
+                ),
+                contains(1, 2, 3, 4)
+        );
+
+        assertThat(
+                scan(
+                        indexStorage,
+                        serializer.serializeRowPrefix(1),
+                        null,
+                        (GREATER | LESS_OR_EQUAL),
+                        AbstractSortedIndexStorageTest::firstArrayElement
+                ),
+                contains(2, 3, 4)
+        );
+
+        assertThat(
+                scan(
+                        indexStorage,
+                        serializer.serializeRowPrefix(1),
+                        null,
+                        (GREATER | LESS),
+                        AbstractSortedIndexStorageTest::firstArrayElement
+                ),
+                contains(2, 3, 4)
+        );
+
+        // Let's check only with the upper bound.
+        assertThat(
+                scan(
+                        indexStorage,
+                        null,
+                        serializer.serializeRowPrefix(3),
+                        (GREATER_OR_EQUAL | LESS_OR_EQUAL),
+                        AbstractSortedIndexStorageTest::firstArrayElement
+                ),
+                contains(0, 1, 2, 3)
+        );
+
+        assertThat(
+                scan(
+                        indexStorage,
+                        null,
+                        serializer.serializeRowPrefix(3),
+                        (GREATER_OR_EQUAL | LESS),
+                        AbstractSortedIndexStorageTest::firstArrayElement
+                ),
+                contains(0, 1, 2)
+        );
+
+        assertThat(
+                scan(
+                        indexStorage,
+                        null,
+                        serializer.serializeRowPrefix(3),
+                        (GREATER | LESS_OR_EQUAL),
+                        AbstractSortedIndexStorageTest::firstArrayElement
+                ),
+                contains(0, 1, 2, 3)
+        );
+
+        assertThat(
+                scan(
+                        indexStorage,
+                        null,
+                        serializer.serializeRowPrefix(3),
+                        (GREATER | LESS),
+                        AbstractSortedIndexStorageTest::firstArrayElement
+                ),
+                contains(0, 1, 2)
+        );
+    }
+
+    @Test
+    void testScanContractAddRowBeforeInvokeHasNext() {
+        SortedIndexDefinition indexDefinition = 
SchemaBuilders.sortedIndex("TEST_IDX")
+                
.addIndexColumn(ColumnType.INT32.typeSpec().name()).asc().done()
+                .build();
+
+        SortedIndexStorage indexStorage = createIndexStorage(indexDefinition);
+
+        BinaryTupleRowSerializer serializer = new 
BinaryTupleRowSerializer(indexStorage.indexDescriptor());
+
+        Cursor<IndexRow> scan = indexStorage.scan(null, null, 0);
+
+        // index  =  [0]
+        // cursor = ^ with no cached row
+        put(indexStorage, serializer.serializeRow(new Object[]{0}, new 
RowId(TEST_PARTITION)));
+
+        // index  = [0]
+        // cursor =    ^ with cached [0]
+        assertTrue(scan.hasNext());
+        // index  = [0]
+        // cursor =    ^ with no cached row
+        assertEquals(0, serializer.deserializeColumns(scan.next())[0]);
+
+        assertFalse(scan.hasNext());
+        assertThrows(NoSuchElementException.class, scan::next);
+    }
+
+    @Test
+    void testScanContractAddRowAfterInvokeHasNext() {
+        SortedIndexDefinition indexDefinition = 
SchemaBuilders.sortedIndex("TEST_IDX")
+                
.addIndexColumn(ColumnType.INT32.typeSpec().name()).asc().done()
+                .build();
+
+        SortedIndexStorage indexStorage = createIndexStorage(indexDefinition);
+
+        BinaryTupleRowSerializer serializer = new 
BinaryTupleRowSerializer(indexStorage.indexDescriptor());
+
+        Cursor<IndexRow> scan = indexStorage.scan(null, null, 0);
+
+        // index  =
+        // cursor = ^ already finished
+        assertFalse(scan.hasNext());
+
+        // index  =  [0]
+        // cursor = ^ already finished
+        put(indexStorage, serializer.serializeRow(new Object[]{0}, new 
RowId(TEST_PARTITION)));
+
+        // index  =  [0]
+        // cursor = ^ already finished
+        assertFalse(scan.hasNext());
+        assertThrows(NoSuchElementException.class, scan::next);
+
+        // index  =  [0]
+        // cursor = ^ no cached row
+        scan = indexStorage.scan(null, null, 0);
+
+        // index  = [0]
+        // cursor =    ^ with cached [0]
+        assertTrue(scan.hasNext());
+        // index  = [0]
+        // cursor =    ^ with no cached row
+        assertEquals(0, serializer.deserializeColumns(scan.next())[0]);
+
+        // index  = [0] [1]
+        // cursor =    ^ with no cached row
+        put(indexStorage, serializer.serializeRow(new Object[]{1}, new 
RowId(TEST_PARTITION)));
+
+        // index  = [0] [1]
+        // cursor =        ^ with cached [1]
+        assertTrue(scan.hasNext());
+        // index  = [0] [1]
+        // cursor =        ^ with no cached row
+        assertEquals(1, serializer.deserializeColumns(scan.next())[0]);
+
+        // index  = [0] [1]
+        // cursor =        ^ already finished
+        assertFalse(scan.hasNext());
+        assertThrows(NoSuchElementException.class, scan::next);
+    }
+
+    @Test
+    void testScanContractInvokeOnlyNext() {
+        SortedIndexDefinition indexDefinition = 
SchemaBuilders.sortedIndex("TEST_IDX")
+                
.addIndexColumn(ColumnType.INT32.typeSpec().name()).asc().done()
+                .build();
+
+        SortedIndexStorage indexStorage = createIndexStorage(indexDefinition);
+
+        BinaryTupleRowSerializer serializer = new 
BinaryTupleRowSerializer(indexStorage.indexDescriptor());
+
+        Cursor<IndexRow> scan = indexStorage.scan(null, null, 0);
+
+        // index  =
+        // cursor = ^ with no cached row
+        assertThrows(NoSuchElementException.class, scan::next);
+
+        // index  =  [0]
+        // cursor = ^ with no cached row
+        put(indexStorage, serializer.serializeRow(new Object[]{0}, new 
RowId(TEST_PARTITION)));
+
+        // index  = [0]
+        // cursor =    ^ already finished
+        assertThrows(NoSuchElementException.class, scan::next);
+
+        // index  =  [0]
+        // cursor = ^ no cached row
+        scan = indexStorage.scan(null, null, 0);
+
+        // index  = [0]
+        // cursor =     ^ no cached row
+        assertEquals(0, serializer.deserializeColumns(scan.next())[0]);
+
+        assertThrows(NoSuchElementException.class, scan::next);
+        assertThrows(NoSuchElementException.class, scan::next);
+    }
+
+    @Test
+    void testScanContractAddRowsOnly() {
+        SortedIndexDefinition indexDefinition = 
SchemaBuilders.sortedIndex("TEST_IDX")
+                
.addIndexColumn(ColumnType.INT32.typeSpec().name()).asc().done()
+                .build();
+
+        SortedIndexStorage indexStorage = createIndexStorage(indexDefinition);
+
+        BinaryTupleRowSerializer serializer = new 
BinaryTupleRowSerializer(indexStorage.indexDescriptor());
+
+        RowId rowId0 = new RowId(TEST_PARTITION, 0, 0);
+        RowId rowId1 = new RowId(TEST_PARTITION, 0, 1);
+        RowId rowId2 = new RowId(TEST_PARTITION, 1, 0);
+
+        Cursor<IndexRow> scan = indexStorage.scan(null, null, 0);
+
+        // index  =  [0, r1]
+        // cursor = ^ with no cached row
+        put(indexStorage, serializer.serializeRow(new Object[]{0}, rowId1));
+
+        // index  = [0, r1]
+        // cursor =        ^ with no cached row
+        IndexRow nextRow = scan.next();
+
+        assertEquals(0, serializer.deserializeColumns(nextRow)[0]);
+        assertEquals(rowId1, nextRow.rowId());
+
+        // index  = [0, r0] [0, r1] [0, r2]
+        // cursor =                ^ with no cached row
+        put(indexStorage, serializer.serializeRow(new Object[]{0}, rowId0));
+        put(indexStorage, serializer.serializeRow(new Object[]{0}, rowId2));
+
+        // index  = [0, r0] [0, r1] [0, r2]
+        // cursor =                        ^ with cached [0, r2]
+        assertTrue(scan.hasNext());
+
+        // index  = [0, r0] [0, r1] [0, r2]
+        // cursor =                        ^ with no cached row
+        nextRow = scan.next();
+
+        assertEquals(0, serializer.deserializeColumns(nextRow)[0]);
+        assertEquals(rowId2, nextRow.rowId());
+
+        // index  = [-1, r0] [0, r0] [0, r1] [0, r2] [1, r0]
+        // cursor =                                 ^ with no cached row
+        put(indexStorage, serializer.serializeRow(new Object[]{1}, rowId0));
+        put(indexStorage, serializer.serializeRow(new Object[]{-1}, rowId0));
+
+        // index  = [-1, r0] [0, r0] [0, r1] [0, r2] [1, r0]
+        // cursor =                                         ^ with cached [1, 
r0]
+        assertTrue(scan.hasNext());
+
+        // index  = [-1, r0] [0, r0] [0, r1] [0, r2] [1, r0]
+        // cursor =                                         ^ with no cached 
row
+        nextRow = scan.next();
+
+        assertEquals(1, serializer.deserializeColumns(nextRow)[0]);
+        assertEquals(rowId0, nextRow.rowId());
+
+        assertFalse(scan.hasNext());
+        assertThrows(NoSuchElementException.class, scan::next);
+    }
+
+    @Test
+    void testScanContractForFinishCursor() {
+        SortedIndexDefinition indexDefinition = 
SchemaBuilders.sortedIndex("TEST_IDX")
+                
.addIndexColumn(ColumnType.INT32.typeSpec().name()).asc().done()
+                .build();
+
+        SortedIndexStorage indexStorage = createIndexStorage(indexDefinition);
+
+        BinaryTupleRowSerializer serializer = new 
BinaryTupleRowSerializer(indexStorage.indexDescriptor());
+
+        Cursor<IndexRow> scan = indexStorage.scan(null, null, 0);
+
+        // index  =
+        // cursor = ^ with no cached row
+        assertFalse(scan.hasNext());
+        assertThrows(NoSuchElementException.class, scan::next);
+
+        // index  =  [0]
+        // cursor = ^ already finished
+        put(indexStorage, serializer.serializeRow(new Object[]{0}, new 
RowId(TEST_PARTITION, 0, 0)));
+
+        // index  =  [0]
+        // cursor = ^ already finished
+        assertFalse(scan.hasNext());
+        assertThrows(NoSuchElementException.class, scan::next);
+
+        scan = indexStorage.scan(null, null, 0);
+
+        // index  = [0]
+        // cursor =     ^ with cached [0, r0]
+        assertTrue(scan.hasNext());
+        // index  = [0]
+        // cursor =    ^ with no cached row
+        assertEquals(0, serializer.deserializeColumns(scan.next())[0]);
+
+        // index  = [0]
+        // cursor =    ^ already finished
+        assertFalse(scan.hasNext());
+        assertThrows(NoSuchElementException.class, scan::next);
+
+        // index  = [-1] [0]
+        // cursor =     ^ already finished
+        put(indexStorage, serializer.serializeRow(new Object[]{-1}, new 
RowId(TEST_PARTITION, 0, 0)));
+
+        // index  = [-1] [0]
+        // cursor =     ^ already finished
+        assertFalse(scan.hasNext());
+        assertThrows(NoSuchElementException.class, scan::next);
+    }
+
+    @Test
+    void testScanContractNextMethodOnly() {
+        SortedIndexDefinition indexDefinition = 
SchemaBuilders.sortedIndex("TEST_IDX")
+                
.addIndexColumn(ColumnType.INT32.typeSpec().name()).asc().done()
+                .build();
+
+        SortedIndexStorage indexStorage = createIndexStorage(indexDefinition);
+
+        BinaryTupleRowSerializer serializer = new 
BinaryTupleRowSerializer(indexStorage.indexDescriptor());
+
+        RowId rowId0 = new RowId(TEST_PARTITION, 0, 0);
+        RowId rowId1 = new RowId(TEST_PARTITION, 0, 1);
+        RowId rowId2 = new RowId(TEST_PARTITION, 0, 1);
+
+        Cursor<IndexRow> scan = indexStorage.scan(null, null, 0);
+
+        put(indexStorage, serializer.serializeRow(new Object[]{0}, rowId0));
+
+        // index  = [0, r0]
+        // cursor =        ^ with no cached row
+        IndexRow nextRow = scan.next();
+
+        assertEquals(0, serializer.deserializeColumns(nextRow)[0]);
+        assertEquals(rowId0, nextRow.rowId());
+
+        // index  = [0, r0] [0, r1]
+        // cursor =        ^ with no cached row
+        put(indexStorage, serializer.serializeRow(new Object[]{0}, rowId1));
+
+        // index  = [0, r0] [0, r1]
+        // cursor =                ^ with no cached row
+        nextRow = scan.next();
+
+        assertEquals(0, serializer.deserializeColumns(nextRow)[0]);
+        assertEquals(rowId1, nextRow.rowId());
+
+        // index  = [-1, r2] [0, r0] [0, r1] [1, r2]
+        // cursor =                         ^ with no cached row
+        put(indexStorage, serializer.serializeRow(new Object[]{1}, rowId2));
+        put(indexStorage, serializer.serializeRow(new Object[]{-1}, rowId2));
+
+        // index  = [-1, r2] [0, r0] [0, r1] [1, r2]
+        // cursor =                                 ^ with no cached row
+        nextRow = scan.next();
+
+        assertEquals(1, serializer.deserializeColumns(nextRow)[0]);
+        assertEquals(rowId2, nextRow.rowId());
+
+        assertThrows(NoSuchElementException.class, scan::next);
+    }
+
+    @Test
+    void testScanContractRemoveRowsOnly() {
+        SortedIndexDefinition indexDefinition = 
SchemaBuilders.sortedIndex("TEST_IDX")
+                
.addIndexColumn(ColumnType.INT32.typeSpec().name()).asc().done()
+                .build();
+
+        SortedIndexStorage indexStorage = createIndexStorage(indexDefinition);
+
+        BinaryTupleRowSerializer serializer = new 
BinaryTupleRowSerializer(indexStorage.indexDescriptor());
+
+        RowId rowId0 = new RowId(TEST_PARTITION, 0, 0);
+        RowId rowId1 = new RowId(TEST_PARTITION, 0, 1);
+
+        put(indexStorage, serializer.serializeRow(new Object[]{0}, rowId0));
+        put(indexStorage, serializer.serializeRow(new Object[]{0}, rowId1));
+        put(indexStorage, serializer.serializeRow(new Object[]{1}, rowId0));
+        put(indexStorage, serializer.serializeRow(new Object[]{2}, rowId1));
+
+        Cursor<IndexRow> scan = indexStorage.scan(null, null, 0);
+
+        // index  = [0, r0] [0, r1] [1, r0] [2, r1]
+        // cursor =        ^ with cached [0, r0]
+        assertTrue(scan.hasNext());
+
+        // index  =  [0, r1] [1, r0] [2, r1]
+        // cursor = ^ with cached [0, r0]
+        remove(indexStorage, serializer.serializeRow(new Object[]{0}, rowId0));
+
+        // index  =  [0, r1] [1, r0] [2, r1]
+        // cursor = ^ with no cached row
+        IndexRow nextRow = scan.next();
+
+        assertEquals(0, serializer.deserializeColumns(nextRow)[0]);
+        assertEquals(rowId0, nextRow.rowId());
+
+        // index  =  [1, r0] [2, r1]
+        // cursor = ^ with no cached row
+        remove(indexStorage, serializer.serializeRow(new Object[]{0}, rowId1));
+
+        // index  = [1, r0] [2, r1]
+        // cursor =        ^ with cached [1, r0]
+        assertTrue(scan.hasNext());
+
+        // index  = [1, r0] [2, r1]
+        // cursor =        ^ with no cached row
+        nextRow = scan.next();
+
+        assertEquals(1, serializer.deserializeColumns(nextRow)[0]);
+        assertEquals(rowId0, nextRow.rowId());
+
+        // index  = [1, r0]
+        // cursor =        ^ with no cached row
+        remove(indexStorage, serializer.serializeRow(new Object[]{2}, rowId1));
+
+        assertFalse(scan.hasNext());
+        assertThrows(NoSuchElementException.class, scan::next);
+    }
+
+    @Test
+    void testScanContractReplaceRow() {
+        SortedIndexDefinition indexDefinition = 
SchemaBuilders.sortedIndex("TEST_IDX")
+                
.addIndexColumn(ColumnType.INT32.typeSpec().name()).asc().done()
+                .build();
+
+        SortedIndexStorage indexStorage = createIndexStorage(indexDefinition);
+
+        BinaryTupleRowSerializer serializer = new 
BinaryTupleRowSerializer(indexStorage.indexDescriptor());
+
+        RowId rowId = new RowId(TEST_PARTITION);
+
+        put(indexStorage, serializer.serializeRow(new Object[]{0}, rowId));
+        put(indexStorage, serializer.serializeRow(new Object[]{2}, rowId));
+
+        Cursor<IndexRow> scan = indexStorage.scan(null, null, 0);
+
+        // index  = [0] [2]
+        // cursor =    ^ with no cached row
+        assertEquals(0, serializer.deserializeColumns(scan.next())[0]);
+
+        // Replace 0 -> 1.
+        // index  =  [1] [2]
+        // cursor = ^ with no cached row
+        remove(indexStorage, serializer.serializeRow(new Object[]{0}, rowId));
+        put(indexStorage, serializer.serializeRow(new Object[]{1}, rowId));
+
+        // index  = [1] [2]
+        // cursor =    ^ with cached [1]
+        assertTrue(scan.hasNext());
+        // index  = [1] [2]
+        // cursor =    ^ with no cached row
+        assertEquals(1, serializer.deserializeColumns(scan.next())[0]);
+
+        // index  = [1] [2]
+        // cursor =        ^ with cached [2]
+        assertTrue(scan.hasNext());
+        // index  = [1] [2]
+        // cursor =        ^ with no cached row
+        assertEquals(2, serializer.deserializeColumns(scan.next())[0]);
+
+        assertFalse(scan.hasNext());
+        assertThrows(NoSuchElementException.class, scan::next);
+    }
+
+    @Test
+    void testScanContractRemoveCachedRow() {
+        SortedIndexDefinition indexDefinition = 
SchemaBuilders.sortedIndex("TEST_IDX")
+                
.addIndexColumn(ColumnType.INT32.typeSpec().name()).asc().done()
+                .build();
+
+        SortedIndexStorage indexStorage = createIndexStorage(indexDefinition);
+
+        BinaryTupleRowSerializer serializer = new 
BinaryTupleRowSerializer(indexStorage.indexDescriptor());
+
+        Cursor<IndexRow> scan = indexStorage.scan(null, null, 0);
+
+        RowId rowId = new RowId(TEST_PARTITION);
+
+        // index  =  [0] [1]
+        // cursor = ^ with no cached row
+        put(indexStorage, serializer.serializeRow(new Object[]{0}, rowId));
+        put(indexStorage, serializer.serializeRow(new Object[]{1}, rowId));
+
+        // index  = [0] [1]
+        // cursor =    ^ with cached [0]
+        assertTrue(scan.hasNext());
+
+        // index  =  [1]
+        // cursor = ^ with cached [0]
+        remove(indexStorage, serializer.serializeRow(new Object[]{0}, rowId));
+
+        // index  =  [1]
+        // cursor = ^ with no cached row
+        assertEquals(0, serializer.deserializeColumns(scan.next())[0]);
+
+        // index  = [1]
+        // cursor =    ^ with cached [1]
+        assertTrue(scan.hasNext());
+
+        // index  =
+        // cursor = ^ with cached [1]
+        remove(indexStorage, serializer.serializeRow(new Object[]{1}, rowId));
+
+        // index  =
+        // cursor = ^ with no cached row
+        assertEquals(1, serializer.deserializeColumns(scan.next())[0]);
+
+        assertFalse(scan.hasNext());
+        assertThrows(NoSuchElementException.class, scan::next);
+
+        scan = indexStorage.scan(null, null, 0);
+
+        // index  =  [2]
+        // cursor = ^ with no cached row
+        put(indexStorage, serializer.serializeRow(new Object[]{2}, rowId));
+
+        // index  = [2]
+        // cursor =    ^ with cached [2]
+        assertTrue(scan.hasNext());
+
+        // index  =
+        // cursor = ^ with cached [2]
+        remove(indexStorage, serializer.serializeRow(new Object[]{2}, rowId));
+
+        // index  =
+        // cursor = ^ with no cached row
+        assertEquals(2, serializer.deserializeColumns(scan.next())[0]);
+
+        assertFalse(scan.hasNext());
+        assertThrows(NoSuchElementException.class, scan::next);
+    }
+
+    @Test
+    void testScanContractRemoveNextAndAddFirstRow() {
+        SortedIndexDefinition indexDefinition = 
SchemaBuilders.sortedIndex("TEST_IDX")
+                
.addIndexColumn(ColumnType.INT32.typeSpec().name()).asc().done()
+                .build();
+
+        SortedIndexStorage indexStorage = createIndexStorage(indexDefinition);
+
+        BinaryTupleRowSerializer serializer = new 
BinaryTupleRowSerializer(indexStorage.indexDescriptor());
+
+        RowId rowId = new RowId(TEST_PARTITION);
+
+        put(indexStorage, serializer.serializeRow(new Object[]{0}, rowId));
+        put(indexStorage, serializer.serializeRow(new Object[]{2}, rowId));
+
+        Cursor<IndexRow> scan = indexStorage.scan(null, null, 0);
+
+        // index  = [0] [2]
+        // cursor =    ^ with no cached row
+        assertEquals(0, serializer.deserializeColumns(scan.next())[0]);
+
+        // index  = [-1] [2]
+        // cursor =     ^ with no cached row
+        remove(indexStorage, serializer.serializeRow(new Object[]{0}, rowId));
+        put(indexStorage, serializer.serializeRow(new Object[]{-1}, rowId));
+
+        // index  = [-1] [2]
+        // cursor =         ^ with cached [2]
+        assertTrue(scan.hasNext());
+        // index  = [-1] [2]
+        // cursor =         ^ with no cached row
+        assertEquals(2, serializer.deserializeColumns(scan.next())[0]);
+
+        assertFalse(scan.hasNext());
+        assertThrows(NoSuchElementException.class, scan::next);
+    }
+
     private List<ColumnDefinition> shuffledRandomDefinitions() {
         return shuffledDefinitions(d -> random.nextBoolean());
     }
@@ -611,7 +1265,7 @@ public abstract class AbstractSortedIndexStorageTest {
      * Tests the Get-Put-Remove scenario: inserts some keys into the storage 
and checks that they have been successfully persisted and can
      * be removed.
      */
-    private void testPutGetRemove(List<ColumnDefinition> indexSchema) throws 
Exception {
+    private void testPutGetRemove(List<ColumnDefinition> indexSchema) {
         SortedIndexStorage indexStorage = createIndexStorage(indexSchema);
 
         TestIndexRow entry1 = TestIndexRow.randomRow(indexStorage);
@@ -644,7 +1298,7 @@ public abstract class AbstractSortedIndexStorageTest {
      * Extracts a single value by a given key or {@code null} if it does not 
exist.
      */
     @Nullable
-    private static IndexRow getSingle(SortedIndexStorage indexStorage, 
BinaryTuple fullPrefix) throws Exception {
+    private static IndexRow getSingle(SortedIndexStorage indexStorage, 
BinaryTuple fullPrefix) {
         List<RowId> rowIds = get(indexStorage, fullPrefix);
 
         assertThat(rowIds, anyOf(empty(), hasSize(1)));
@@ -663,17 +1317,28 @@ public abstract class AbstractSortedIndexStorageTest {
             @Nullable BinaryTuplePrefix lowerBound,
             @Nullable BinaryTuplePrefix upperBound,
             @MagicConstant(flagsFromClass = SortedIndexStorage.class) int flags
-    ) throws Exception {
+    ) {
+        return scan(index, lowerBound, upperBound, flags, identity());
+    }
+
+    private static <T> List<T> scan(
+            SortedIndexStorage index,
+            @Nullable BinaryTuplePrefix lowerBound,
+            @Nullable BinaryTuplePrefix upperBound,
+            @MagicConstant(flagsFromClass = SortedIndexStorage.class) int 
flags,
+            Function<Object[], T> mapper
+    ) {
         var serializer = new BinaryTupleRowSerializer(index.indexDescriptor());
 
         try (Cursor<IndexRow> cursor = index.scan(lowerBound, upperBound, 
flags)) {
             return cursor.stream()
                     .map(serializer::deserializeColumns)
+                    .map(mapper)
                     .collect(toUnmodifiableList());
         }
     }
 
-    protected static List<RowId> get(SortedIndexStorage index, BinaryTuple 
key) throws Exception {
+    protected static List<RowId> get(SortedIndexStorage index, BinaryTuple 
key) {
         try (Cursor<RowId> cursor = index.get(key)) {
             return cursor.stream().collect(toUnmodifiableList());
         }
@@ -694,4 +1359,22 @@ public abstract class AbstractSortedIndexStorageTest {
             return null;
         });
     }
+
+    private static <T> List<T> getRemaining(Cursor<IndexRow> scanCursor, 
Function<IndexRow, T> mapper) {
+        List<T> result = new ArrayList<>();
+
+        while (scanCursor.hasNext()) {
+            result.add(mapper.apply(scanCursor.next()));
+        }
+
+        return result;
+    }
+
+    private static <T> Function<IndexRow, T> 
firstColumn(BinaryTupleRowSerializer serializer) {
+        return indexRow -> (T) serializer.deserializeColumns(indexRow)[0];
+    }
+
+    private static <T> T firstArrayElement(Object[] objects) {
+        return (T) objects[0];
+    }
 }
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/BinaryTupleRowSerializer.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/BinaryTupleRowSerializer.java
index 9b029d0018..65c6308656 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/BinaryTupleRowSerializer.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/BinaryTupleRowSerializer.java
@@ -108,7 +108,7 @@ public class BinaryTupleRowSerializer {
     /**
      * Creates a prefix of an {@link IndexRow} using the provided columns.
      */
-    public BinaryTuplePrefix serializeRowPrefix(Object[] prefixColumnValues) {
+    public BinaryTuplePrefix serializeRowPrefix(Object... prefixColumnValues) {
         if (prefixColumnValues.length > schema.size()) {
             throw new IllegalArgumentException(String.format(
                     "Incorrect number of column values passed. Expected not 
more than %d, got %d",
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
index 4f7f727cb2..bef8d5846d 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java
@@ -17,14 +17,14 @@
 
 package org.apache.ignite.internal.storage.index.impl;
 
-import static org.apache.ignite.internal.util.IgniteUtils.capacity;
+import static java.util.Collections.emptyNavigableMap;
 
 import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.HashSet;
 import java.util.Iterator;
-import java.util.Set;
-import java.util.SortedMap;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.NoSuchElementException;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
@@ -45,7 +45,13 @@ import org.jetbrains.annotations.Nullable;
  * Test implementation of MV sorted index storage.
  */
 public class TestSortedIndexStorage implements SortedIndexStorage {
-    private final ConcurrentNavigableMap<ByteBuffer, Set<RowId>> index;
+    private static final Object NULL = new Object();
+
+    /**
+     * {@code NavigableMap<RowId, Object>} is used as a {@link NavigableSet}, 
but map was chosen because methods like
+     * {@link NavigableSet#first()} throw an {@link NoSuchElementException} if 
the set is empty.
+     */
+    private final ConcurrentNavigableMap<ByteBuffer, NavigableMap<RowId, 
Object>> index;
 
     private final SortedIndexDescriptor descriptor;
 
@@ -68,11 +74,28 @@ public class TestSortedIndexStorage implements 
SortedIndexStorage {
     public Cursor<RowId> get(BinaryTuple key) throws StorageException {
         checkClosed();
 
-        Iterator<RowId> iterator = index.getOrDefault(key.byteBuffer(), 
Set.of()).stream()
-                .peek(rowId -> checkClosed())
-                .iterator();
+        Iterator<RowId> iterator = index.getOrDefault(key.byteBuffer(), 
emptyNavigableMap()).keySet().iterator();
+
+        return new Cursor<>() {
+            @Override
+            public void close() {
+                // No-op.
+            }
+
+            @Override
+            public boolean hasNext() {
+                checkClosed();
+
+                return iterator.hasNext();
+            }
 
-        return Cursor.fromBareIterator(iterator);
+            @Override
+            public RowId next() {
+                checkClosed();
+
+                return iterator.next();
+            }
+        };
     }
 
     @Override
@@ -80,18 +103,11 @@ public class TestSortedIndexStorage implements 
SortedIndexStorage {
         checkClosed();
 
         index.compute(row.indexColumns().byteBuffer(), (k, v) -> {
-            if (v == null) {
-                return Set.of(row.rowId());
-            } else if (v.contains(row.rowId())) {
-                return v;
-            } else {
-                var result = new HashSet<RowId>(capacity(v.size() + 1));
+            NavigableMap<RowId, Object> rowIds = v == null ? new 
ConcurrentSkipListMap<>() : v;
 
-                result.addAll(v);
-                result.add(row.rowId());
+            rowIds.put(row.rowId(), NULL);
 
-                return result;
-            }
+            return rowIds;
         });
     }
 
@@ -100,19 +116,9 @@ public class TestSortedIndexStorage implements 
SortedIndexStorage {
         checkClosed();
 
         index.computeIfPresent(row.indexColumns().byteBuffer(), (k, v) -> {
-            if (v.contains(row.rowId())) {
-                if (v.size() == 1) {
-                    return null;
-                } else {
-                    var result = new HashSet<>(v);
+            v.remove(row.rowId());
 
-                    result.remove(row.rowId());
-
-                    return result;
-                }
-            } else {
-                return v;
-            }
+            return v.isEmpty() ? null : v;
         });
     }
 
@@ -135,48 +141,23 @@ public class TestSortedIndexStorage implements 
SortedIndexStorage {
             setEqualityFlag(upperBound);
         }
 
-        SortedMap<ByteBuffer, Set<RowId>> data;
+        NavigableMap<ByteBuffer, NavigableMap<RowId, Object>> navigableMap;
 
         if (lowerBound == null && upperBound == null) {
-            data = index;
+            navigableMap = index;
         } else if (lowerBound == null) {
-            data = index.headMap(upperBound.byteBuffer());
+            navigableMap = index.headMap(upperBound.byteBuffer());
         } else if (upperBound == null) {
-            data = index.tailMap(lowerBound.byteBuffer());
+            navigableMap = index.tailMap(lowerBound.byteBuffer());
         } else {
             try {
-                data = index.subMap(lowerBound.byteBuffer(), 
upperBound.byteBuffer());
+                navigableMap = index.subMap(lowerBound.byteBuffer(), 
upperBound.byteBuffer());
             } catch (IllegalArgumentException e) {
-                data = Collections.emptySortedMap();
+                navigableMap = emptyNavigableMap();
             }
         }
 
-        Iterator<? extends IndexRow> iterator = data.entrySet().stream()
-                .flatMap(e -> {
-                    var tuple = new 
BinaryTuple(descriptor.binaryTupleSchema(), e.getKey());
-
-                    return e.getValue().stream().map(rowId -> new 
IndexRowImpl(tuple, rowId));
-                })
-                .iterator();
-
-        return new Cursor<>() {
-            @Override
-            public void close() {
-                // No-op.
-            }
-
-            @Override
-            public boolean hasNext() {
-                checkClosed();
-
-                return iterator.hasNext();
-            }
-
-            @Override
-            public IndexRow next() {
-                return iterator.next();
-            }
-        };
+        return new ScanCursor(navigableMap);
     }
 
     private static void setEqualityFlag(BinaryTuplePrefix prefix) {
@@ -208,4 +189,93 @@ public class TestSortedIndexStorage implements 
SortedIndexStorage {
             throw new StorageClosedException("Storage is already closed");
         }
     }
+
+    private class ScanCursor implements Cursor<IndexRow> {
+        private final NavigableMap<ByteBuffer, NavigableMap<RowId, Object>> 
indexMap;
+
+        @Nullable
+        private Boolean hasNext;
+
+        @Nullable
+        private Entry<ByteBuffer, NavigableMap<RowId, Object>> indexMapEntry;
+
+        @Nullable
+        private RowId rowId;
+
+        private ScanCursor(NavigableMap<ByteBuffer, NavigableMap<RowId, 
Object>> indexMap) {
+            this.indexMap = indexMap;
+        }
+
+        @Override
+        public void close() {
+            // No-op.
+        }
+
+        @Override
+        public boolean hasNext() {
+            checkClosed();
+
+            advanceIfNeeded();
+
+            return hasNext;
+        }
+
+        @Override
+        public IndexRow next() {
+            checkClosed();
+
+            advanceIfNeeded();
+
+            boolean hasNext = this.hasNext;
+
+            if (!hasNext) {
+                throw new NoSuchElementException();
+            }
+
+            this.hasNext = null;
+
+            return new IndexRowImpl(new 
BinaryTuple(descriptor.binaryTupleSchema(), indexMapEntry.getKey()), rowId);
+        }
+
+        private void advanceIfNeeded() {
+            if (hasNext != null) {
+                return;
+            }
+
+            if (indexMapEntry == null) {
+                indexMapEntry = indexMap.firstEntry();
+            }
+
+            if (rowId == null) {
+                if (indexMapEntry != null) {
+                    rowId = getRowId(indexMapEntry.getValue().firstEntry());
+                }
+            } else {
+                Entry<RowId, Object> nextRowIdEntry = 
indexMapEntry.getValue().higherEntry(rowId);
+
+                if (nextRowIdEntry != null) {
+                    rowId = nextRowIdEntry.getKey();
+                } else {
+                    Entry<ByteBuffer, NavigableMap<RowId, Object>> 
nextIndexMapEntry = indexMap.higherEntry(indexMapEntry.getKey());
+
+                    if (nextIndexMapEntry == null) {
+                        hasNext = false;
+
+                        return;
+                    } else {
+                        indexMapEntry = nextIndexMapEntry;
+
+                        rowId = 
getRowId(indexMapEntry.getValue().firstEntry());
+                    }
+                }
+            }
+
+            hasNext = rowId != null;
+        }
+
+        @Nullable
+        private RowId getRowId(@Nullable Entry<RowId, ?> rowIdEntry) {
+            return rowIdEntry == null ? null : rowIdEntry.getKey();
+        }
+    }
 }
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
index 4923f005b6..b9d4beffeb 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
@@ -20,6 +20,7 @@ package 
org.apache.ignite.internal.storage.pagememory.index.sorted;
 import java.lang.invoke.MethodHandles;
 import java.lang.invoke.VarHandle;
 import java.nio.ByteBuffer;
+import java.util.NoSuchElementException;
 import java.util.function.Function;
 import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
 import org.apache.ignite.internal.schema.BinaryTuple;
@@ -175,9 +176,7 @@ public class PageMemorySortedIndexStorage implements 
SortedIndexStorage {
 
             SortedIndexRowKey upper = createBound(upperBound, includeUpper);
 
-            return convertCursor(sortedIndexTree.find(lower, upper), 
this::toIndexRowImpl);
-        } catch (IgniteInternalCheckedException e) {
-            throw new StorageException("Failed to create scan cursor", e);
+            return new ScanCursor(lower, upper);
         } finally {
             closeBusyLock.leaveBusy();
         }
@@ -272,4 +271,99 @@ public class PageMemorySortedIndexStorage implements 
SortedIndexStorage {
             }
         };
     }
+
+    private class ScanCursor implements Cursor<IndexRow> {
+        @Nullable
+        private Boolean hasNext;
+
+        @Nullable
+        private final SortedIndexRowKey lower;
+
+        @Nullable
+        private final SortedIndexRowKey upper;
+
+        @Nullable
+        private SortedIndexRow treeRow;
+
+        private ScanCursor(@Nullable SortedIndexRowKey lower, @Nullable 
SortedIndexRowKey upper) {
+            this.lower = lower;
+            this.upper = upper;
+        }
+
+        @Override
+        public void close() {
+            // No-op.
+        }
+
+        @Override
+        public boolean hasNext() {
+            if (!closeBusyLock.enterBusy()) {
+                throwStorageClosedException();
+            }
+
+            try {
+                advanceIfNeeded();
+
+                return hasNext;
+            } catch (IgniteInternalCheckedException e) {
+                throw new StorageException("Error while advancing the cursor", 
e);
+            } finally {
+                closeBusyLock.leaveBusy();
+            }
+        }
+
+        @Override
+        public IndexRow next() {
+            if (!closeBusyLock.enterBusy()) {
+                throwStorageClosedException();
+            }
+
+            try {
+                advanceIfNeeded();
+
+                boolean hasNext = this.hasNext;
+
+                if (!hasNext) {
+                    throw new NoSuchElementException();
+                }
+
+                this.hasNext = null;
+
+                return toIndexRowImpl(treeRow);
+            } catch (IgniteInternalCheckedException e) {
+                throw new StorageException("Error while advancing the cursor", 
e);
+            } finally {
+                closeBusyLock.leaveBusy();
+            }
+        }
+
+        private void advanceIfNeeded() throws IgniteInternalCheckedException {
+            if (hasNext != null) {
+                return;
+            }
+
+            if (treeRow == null) {
+                treeRow = lower == null ? sortedIndexTree.findFirst() : 
sortedIndexTree.findNext(lower, true);
+            } else {
+                SortedIndexRow next = sortedIndexTree.findNext(treeRow, false);
+
+                if (next == null) {
+                    hasNext = false;
+
+                    return;
+                } else {
+                    treeRow = next;
+                }
+            }
+
+            hasNext = treeRow != null && (upper == null || 
compareRows(treeRow, upper) < 0);
+        }
+
+        private int compareRows(SortedIndexRowKey key1, SortedIndexRowKey 
key2) {
+            return sortedIndexTree.getBinaryTupleComparator().compare(
+                    key1.indexColumns().valueBuffer(),
+                    key2.indexColumns().valueBuffer()
+            );
+        }
+    }
 }
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/SortedIndexTree.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/SortedIndexTree.java
index 2217081f09..5c21b14843 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/SortedIndexTree.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/SortedIndexTree.java
@@ -164,4 +164,11 @@ public class SortedIndexTree extends 
BplusTree<SortedIndexRowKey, SortedIndexRow
 
         assert result == Boolean.TRUE : result;
     }
+
+    /**
+     * Returns comparator of index columns {@link BinaryTuple}s.
+     */
+    BinaryTupleComparator getBinaryTupleComparator() {
+        return binaryTupleComparator;
+    }
 }
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
index fb558047e3..b06cc00a74 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
@@ -19,12 +19,13 @@ package org.apache.ignite.internal.storage.rocksdb.index;
 
 import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
 import static org.apache.ignite.internal.util.CursorUtils.map;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
 
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
+import java.util.NoSuchElementException;
 import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
 import org.apache.ignite.internal.rocksdb.ColumnFamily;
-import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
 import org.apache.ignite.internal.rocksdb.RocksUtils;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.BinaryTuplePrefix;
@@ -167,23 +168,77 @@ public class RocksDbSortedIndexStorage implements 
SortedIndexStorage {
 
         RocksIterator it = indexCf.newIterator(options);
 
-        if (lowerBound == null) {
-            it.seek(partitionStorage.partitionStartPrefix());
-        } else {
-            it.seek(lowerBound);
-        }
+        return new Cursor<>() {
+            @Nullable
+            private Boolean hasNext;
+
+            private byte @Nullable [] key;
 
-        return new RocksIteratorAdapter<>(it) {
             @Override
-            protected ByteBuffer decodeEntry(byte[] key, byte[] value) {
-                return ByteBuffer.wrap(key).order(ORDER);
+            public void close() {
+                try {
+                    closeAll(it, options, upperBoundSlice);
+                } catch (Exception e) {
+                    throw new StorageException("Error closing cursor", e);
+                }
             }
 
             @Override
-            public void close() {
-                super.close();
+            public boolean hasNext() {
+                advanceIfNeeded();
+
+                return hasNext;
+            }
+
+            @Override
+            public ByteBuffer next() {
+                advanceIfNeeded();
+
+                boolean hasNext = this.hasNext;
+
+                if (!hasNext) {
+                    throw new NoSuchElementException();
+                }
+
+                this.hasNext = null;
+
+                return ByteBuffer.wrap(key).order(ORDER);
+            }
+
+            private void advanceIfNeeded() throws StorageException {
+                if (hasNext != null) {
+                    return;
+                }
+
+                try {
+                    it.refresh();
+                } catch (RocksDBException e) {
+                    throw new StorageException("Error refreshing an iterator", 
e);
+                }
+
+                if (key == null) {
+                    it.seek(lowerBound == null ? 
partitionStorage.partitionStartPrefix() : lowerBound);
+                } else {
+                    it.seekForPrev(key);
+
+                    if (it.isValid()) {
+                        it.next();
+                    } else {
+                        RocksUtils.checkIterator(it);
+
+                        it.seek(lowerBound == null ? 
partitionStorage.partitionStartPrefix() : lowerBound);
+                    }
+                }
+
+                if (!it.isValid()) {
+                    RocksUtils.checkIterator(it);
+
+                    hasNext = false;
+                } else {
+                    key = it.key();
 
-                RocksUtils.closeAll(options, upperBoundSlice);
+                    hasNext = true;
+                }
             }
         };
     }

Reply via email to