This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 28afe43525 IGNITE-19422 Fixed "get" method in index storages. (#2161)
28afe43525 is described below

commit 28afe435258f1c1a1f4055f18b136ba0a1573f2b
Author: Ivan Bessonov <bessonov...@gmail.com>
AuthorDate: Thu Jun 8 12:54:08 2023 +0300

    IGNITE-19422 Fixed "get" method in index storages. (#2161)
---
 .../storage/index/AbstractIndexStorageTest.java    |   3 -
 .../index/AbstractPageMemoryIndexStorage.java      | 127 +++++++++++++-
 .../IndexRowKey.java}                              |  22 +--
 .../pagememory/index/hash/HashIndexRowKey.java     |   9 +-
 .../index/hash/PageMemoryHashIndexStorage.java     |  47 ++----
 .../index/sorted/PageMemorySortedIndexStorage.java | 184 ++++-----------------
 .../pagememory/index/sorted/SortedIndexRowKey.java |   9 +-
 .../rocksdb/index/AbstractRocksDbIndexStorage.java | 143 ++++++++++++++++
 .../rocksdb/index/RocksDbHashIndexStorage.java     |  47 +-----
 .../rocksdb/index/RocksDbSortedIndexStorage.java   | 127 +-------------
 10 files changed, 334 insertions(+), 384 deletions(-)

diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractIndexStorageTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractIndexStorageTest.java
index 6f6090f8e4..0148c839d6 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractIndexStorageTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractIndexStorageTest.java
@@ -60,7 +60,6 @@ import 
org.apache.ignite.internal.storage.index.impl.BinaryTupleRowSerializer;
 import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 /**
@@ -204,7 +203,6 @@ public abstract class AbstractIndexStorageTest<S extends 
IndexStorage, D extends
     }
 
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-19422";)
     public void testGetConcurrentPut() {
         S index = createIndexStorage(INDEX_NAME, ColumnType.INT32, 
ColumnType.string());
         var serializer = new BinaryTupleRowSerializer(indexDescriptor(index));
@@ -230,7 +228,6 @@ public abstract class AbstractIndexStorageTest<S extends 
IndexStorage, D extends
     }
 
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-19422";)
     public void testGetConcurrentReplace() {
         S index = createIndexStorage(INDEX_NAME, ColumnType.INT32, 
ColumnType.string());
         var serializer = new BinaryTupleRowSerializer(indexDescriptor(index));
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/AbstractPageMemoryIndexStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/AbstractPageMemoryIndexStorage.java
index cb7c502935..a3fcfcee45 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/AbstractPageMemoryIndexStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/AbstractPageMemoryIndexStorage.java
@@ -22,13 +22,17 @@ import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptio
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageStateOnRebalance;
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageInProgressOfRebalance;
 
+import java.util.NoSuchElementException;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
+import org.apache.ignite.internal.pagememory.tree.BplusTree;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.StorageRebalanceException;
 import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.storage.index.PeekCursor;
+import org.apache.ignite.internal.storage.pagememory.index.common.IndexRowKey;
 import 
org.apache.ignite.internal.storage.pagememory.index.freelist.IndexColumnsFreeList;
 import org.apache.ignite.internal.storage.pagememory.index.meta.IndexMeta;
 import org.apache.ignite.internal.storage.pagememory.index.meta.IndexMetaKey;
@@ -43,7 +47,7 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Abstract index storage based on Page Memory.
  */
-public abstract class AbstractPageMemoryIndexStorage implements IndexStorage {
+public abstract class AbstractPageMemoryIndexStorage<K extends IndexRowKey, V 
extends K> implements IndexStorage {
     /** Index ID. */
     private final int indexId;
 
@@ -192,6 +196,127 @@ public abstract class AbstractPageMemoryIndexStorage 
implements IndexStorage {
         state.compareAndSet(StorageState.CLEANUP, StorageState.RUNNABLE);
     }
 
+    /** Constant that represents the absence of value in {@link ScanCursor}. 
Not equivalent to {@code null} value. */
+    private static final IndexRowKey NO_INDEX_ROW = () -> null;
+
+    /**
+     * Cursor that always returns up-to-date next element.
+     *
+     * @param <R> Type of the returned value.
+     */
+    protected abstract class ScanCursor<R> implements PeekCursor<R> {
+        private final BplusTree<K, V> indexTree;
+
+        private final @Nullable K lower;
+
+        private @Nullable Boolean hasNext;
+
+        /**
+         * Last row used in mapping in the {@link #next()} call.
+         * {@code null} upon cursor creation or after {@link #hasNext()} 
returned {@code null}.
+         */
+        private @Nullable V treeRow;
+
+        /**
+         * Row used in the mapping of the latest {@link #peek()} call, that 
was performed after the last {@link #next()} call.
+         * {@link #NO_INDEX_ROW} if there was no such call.
+         */
+        private @Nullable V peekedRow = (V) NO_INDEX_ROW;
+
+        protected ScanCursor(@Nullable K lower, BplusTree<K, V> indexTree) {
+            this.lower = lower;
+            this.indexTree = indexTree;
+        }
+
+        /**
+         * Maps value from the index tree into the required result.
+         */
+        protected abstract R map(V value);
+
+        /**
+         * Check whether the passed value exceeds the upper bound for the scan.
+         */
+        protected abstract boolean exceedsUpperBound(V value);
+
+        @Override
+        public void close() {
+            // No-op.
+        }
+
+        @Override
+        public boolean hasNext() {
+            return busy(() -> {
+                try {
+                    return advanceIfNeededBusy();
+                } catch (IgniteInternalCheckedException e) {
+                    throw new StorageException("Error while advancing the 
cursor", e);
+                }
+            });
+        }
+
+        @Override
+        public R next() {
+            return busy(() -> {
+                try {
+                    if (!advanceIfNeededBusy()) {
+                        throw new NoSuchElementException();
+                    }
+
+                    this.hasNext = null;
+
+                    return map(treeRow);
+                } catch (IgniteInternalCheckedException e) {
+                    throw new StorageException("Error while advancing the 
cursor", e);
+                }
+            });
+        }
+
+        @Override
+        public @Nullable R peek() {
+            return busy(() -> {
+                throwExceptionIfStorageInProgressOfRebalance(state.get(), 
AbstractPageMemoryIndexStorage.this::createStorageInfo);
+
+                try {
+                    return map(peekBusy());
+                } catch (IgniteInternalCheckedException e) {
+                    throw new StorageException("Error when peeking next 
element", e);
+                }
+            });
+        }
+
+        private @Nullable V peekBusy() throws IgniteInternalCheckedException {
+            if (hasNext != null) {
+                return treeRow;
+            }
+
+            if (treeRow == null) {
+                peekedRow = lower == null ? indexTree.findFirst() : 
indexTree.findNext(lower, true);
+            } else {
+                peekedRow = indexTree.findNext(treeRow, false);
+            }
+
+            if (peekedRow != null && exceedsUpperBound(peekedRow)) {
+                peekedRow = null;
+            }
+
+            return peekedRow;
+        }
+
+        private boolean advanceIfNeededBusy() throws 
IgniteInternalCheckedException {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), 
AbstractPageMemoryIndexStorage.this::createStorageInfo);
+
+            if (hasNext != null) {
+                return hasNext;
+            }
+
+            treeRow = (peekedRow == NO_INDEX_ROW) ? peekBusy() : peekedRow;
+            peekedRow = (V) NO_INDEX_ROW;
+
+            hasNext = treeRow != null;
+            return hasNext;
+        }
+    }
+
     protected <V> V busy(Supplier<V> supplier) {
         if (!busyLock.enterBusy()) {
             throwExceptionDependingOnStorageState(state.get(), 
createStorageInfo());
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/SortedIndexRowKey.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/common/IndexRowKey.java
similarity index 65%
copy from 
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/SortedIndexRowKey.java
copy to 
modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/common/IndexRowKey.java
index efecee91e5..37f2b41c5e 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/SortedIndexRowKey.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/common/IndexRowKey.java
@@ -15,29 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.storage.pagememory.index.sorted;
+package org.apache.ignite.internal.storage.pagememory.index.common;
 
 import 
org.apache.ignite.internal.storage.pagememory.index.freelist.IndexColumns;
 
 /**
- * Key to search for a {@link SortedIndexRow} in the {@link SortedIndexTree}.
+ * Common interface for search keys in index trees.
  */
-public class SortedIndexRowKey {
-    private final IndexColumns indexColumns;
-
-    /**
-     * Constructor.
-     *
-     * @param indexColumns Index columns.
-     */
-    public SortedIndexRowKey(IndexColumns indexColumns) {
-        this.indexColumns = indexColumns;
-    }
-
+@FunctionalInterface
+public interface IndexRowKey {
     /**
      * Returns an indexed columns value.
      */
-    public IndexColumns indexColumns() {
-        return indexColumns;
-    }
+    IndexColumns indexColumns();
 }
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/HashIndexRowKey.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/HashIndexRowKey.java
index 74dab15fb4..ed1e0068d7 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/HashIndexRowKey.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/HashIndexRowKey.java
@@ -17,12 +17,13 @@
 
 package org.apache.ignite.internal.storage.pagememory.index.hash;
 
+import org.apache.ignite.internal.storage.pagememory.index.common.IndexRowKey;
 import 
org.apache.ignite.internal.storage.pagememory.index.freelist.IndexColumns;
 
 /**
  * Key to search for a {@link HashIndexRow} in the {@link HashIndexTree}.
  */
-public class HashIndexRowKey {
+public class HashIndexRowKey implements IndexRowKey {
     private final int indexColumnsHash;
 
     private final IndexColumns indexColumns;
@@ -33,7 +34,7 @@ public class HashIndexRowKey {
      * @param indexColumnsHash Hash of the index columns.
      * @param indexColumns Index columns.
      */
-    public HashIndexRowKey(int indexColumnsHash, IndexColumns indexColumns) {
+    HashIndexRowKey(int indexColumnsHash, IndexColumns indexColumns) {
         this.indexColumnsHash = indexColumnsHash;
 
         this.indexColumns = indexColumns;
@@ -46,9 +47,7 @@ public class HashIndexRowKey {
         return indexColumnsHash;
     }
 
-    /**
-     * Returns an indexed columns value.
-     */
+    @Override
     public IndexColumns indexColumns() {
         return indexColumns;
     }
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java
index 26feeb34cb..1d70bdf1d4 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/hash/PageMemoryHashIndexStorage.java
@@ -20,6 +20,7 @@ package 
org.apache.ignite.internal.storage.pagememory.index.hash;
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageInProgressOfRebalance;
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInCleanupOrRebalancedState;
 
+import java.util.Objects;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.pagememory.util.GradualTaskExecutor;
@@ -41,7 +42,7 @@ import org.apache.ignite.lang.IgniteInternalCheckedException;
 /**
  * Implementation of Hash index storage using Page Memory.
  */
-public class PageMemoryHashIndexStorage extends AbstractPageMemoryIndexStorage 
implements HashIndexStorage {
+public class PageMemoryHashIndexStorage extends 
AbstractPageMemoryIndexStorage<HashIndexRowKey, HashIndexRow> implements 
HashIndexStorage {
     private static final IgniteLogger LOG = 
Loggers.forClass(PageMemoryHashIndexStorage.class);
 
     /** Index descriptor. */
@@ -82,41 +83,21 @@ public class PageMemoryHashIndexStorage extends 
AbstractPageMemoryIndexStorage i
         return busy(() -> {
             throwExceptionIfStorageInProgressOfRebalance(state.get(), 
this::createStorageInfo);
 
-            try {
-                IndexColumns indexColumns = new IndexColumns(partitionId, 
key.byteBuffer());
-
-                HashIndexRow lowerBound = new HashIndexRow(indexColumns, 
lowestRowId);
-                HashIndexRow upperBound = new HashIndexRow(indexColumns, 
highestRowId);
-
-                Cursor<HashIndexRow> cursor = hashIndexTree.find(lowerBound, 
upperBound);
-
-                return new Cursor<RowId>() {
-                    @Override
-                    public void close() {
-                        cursor.close();
-                    }
-
-                    @Override
-                    public boolean hasNext() {
-                        return busy(() -> {
-                            
throwExceptionIfStorageInProgressOfRebalance(state.get(), 
PageMemoryHashIndexStorage.this::createStorageInfo);
+            IndexColumns indexColumns = new IndexColumns(partitionId, 
key.byteBuffer());
 
-                            return cursor.hasNext();
-                        });
-                    }
+            HashIndexRow lowerBound = new HashIndexRow(indexColumns, 
lowestRowId);
 
-                    @Override
-                    public RowId next() {
-                        return busy(() -> {
-                            
throwExceptionIfStorageInProgressOfRebalance(state.get(), 
PageMemoryHashIndexStorage.this::createStorageInfo);
+            return new ScanCursor<RowId>(lowerBound, hashIndexTree) {
+                @Override
+                protected RowId map(HashIndexRow value) {
+                    return value.rowId();
+                }
 
-                            return cursor.next().rowId();
-                        });
-                    }
-                };
-            } catch (IgniteInternalCheckedException e) {
-                throw new StorageException("Failed to create scan cursor", e);
-            }
+                @Override
+                protected boolean exceedsUpperBound(HashIndexRow value) {
+                    return !Objects.equals(value.indexColumns().valueBuffer(), 
key.byteBuffer());
+                }
+            };
         });
     }
 
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
index 31c4ae8135..fb0079fa9e 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java
@@ -21,8 +21,7 @@ import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptio
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInCleanupOrRebalancedState;
 
 import java.nio.ByteBuffer;
-import java.util.NoSuchElementException;
-import java.util.function.Function;
+import java.util.Objects;
 import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
@@ -31,7 +30,6 @@ import org.apache.ignite.internal.pagememory.util.PageIdUtils;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.BinaryTuplePrefix;
 import org.apache.ignite.internal.storage.RowId;
-import org.apache.ignite.internal.storage.StorageClosedException;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.index.IndexRow;
 import org.apache.ignite.internal.storage.index.IndexRowImpl;
@@ -50,7 +48,8 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Implementation of Sorted index storage using Page Memory.
  */
-public class PageMemorySortedIndexStorage extends 
AbstractPageMemoryIndexStorage implements SortedIndexStorage {
+public class PageMemorySortedIndexStorage extends 
AbstractPageMemoryIndexStorage<SortedIndexRowKey, SortedIndexRow>
+        implements SortedIndexStorage {
     private static final IgniteLogger LOG = 
Loggers.forClass(PageMemorySortedIndexStorage.class);
 
     /** Index descriptor. */
@@ -91,15 +90,19 @@ public class PageMemorySortedIndexStorage extends 
AbstractPageMemoryIndexStorage
         return busy(() -> {
             throwExceptionIfStorageInProgressOfRebalance(state.get(), 
this::createStorageInfo);
 
-            try {
-                SortedIndexRowKey lowerBound = toSortedIndexRow(key, 
lowestRowId);
+            SortedIndexRowKey lowerBound = toSortedIndexRow(key, lowestRowId);
 
-                SortedIndexRowKey upperBound = toSortedIndexRow(key, 
highestRowId);
+            return new ScanCursor<RowId>(lowerBound, sortedIndexTree) {
+                @Override
+                protected RowId map(SortedIndexRow value) {
+                    return value.rowId();
+                }
 
-                return convertCursor(sortedIndexTree.find(lowerBound, 
upperBound), SortedIndexRow::rowId);
-            } catch (IgniteInternalCheckedException e) {
-                throw new StorageException("Failed to create scan cursor", e);
-            }
+                @Override
+                protected boolean exceedsUpperBound(SortedIndexRow value) {
+                    return !Objects.equals(value.indexColumns().valueBuffer(), 
key.byteBuffer());
+                }
+            };
         });
     }
 
@@ -154,7 +157,20 @@ public class PageMemorySortedIndexStorage extends 
AbstractPageMemoryIndexStorage
 
             SortedIndexRowKey upper = createBound(upperBound, includeUpper);
 
-            return new ScanCursor(lower, upper);
+            return new ScanCursor<IndexRow>(lower, sortedIndexTree) {
+                @Override
+                public IndexRow map(SortedIndexRow value) {
+                    return toIndexRowImpl(value);
+                }
+
+                @Override
+                protected boolean exceedsUpperBound(SortedIndexRow value) {
+                    return upper != null && 0 <= 
sortedIndexTree.getBinaryTupleComparator().compare(
+                            value.indexColumns().valueBuffer(),
+                            upper.indexColumns().valueBuffer()
+                    );
+                }
+            };
         });
     }
 
@@ -190,150 +206,6 @@ public class PageMemorySortedIndexStorage extends 
AbstractPageMemoryIndexStorage
         sortedIndexTree.close();
     }
 
-    /**
-     * Returns a new cursor that converts elements to another type, and also 
throws {@link StorageClosedException} on
-     * {@link Cursor#hasNext()} and {@link Cursor#next()} when the sorted 
index storage is {@link #close()}.
-     *
-     * @param cursor Cursor.
-     * @param mapper Conversion function.
-     */
-    private <T, R> Cursor<R> convertCursor(Cursor<T> cursor, Function<T, R> 
mapper) {
-        return new Cursor<>() {
-            @Override
-            public void close() {
-                cursor.close();
-            }
-
-            @Override
-            public boolean hasNext() {
-                return busy(() -> {
-                    throwExceptionIfStorageInProgressOfRebalance(state.get(), 
PageMemorySortedIndexStorage.this::createStorageInfo);
-
-                    return cursor.hasNext();
-                });
-            }
-
-            @Override
-            public R next() {
-                return busy(() -> {
-                    throwExceptionIfStorageInProgressOfRebalance(state.get(), 
PageMemorySortedIndexStorage.this::createStorageInfo);
-
-                    return mapper.apply(cursor.next());
-                });
-            }
-        };
-    }
-
-    /** Constant that represents the absence of value in {@link ScanCursor}. */
-    private static final SortedIndexRow NO_INDEX_ROW = new 
SortedIndexRow(null, null);
-
-    private class ScanCursor implements PeekCursor<IndexRow> {
-        @Nullable
-        private Boolean hasNext;
-
-        @Nullable
-        private final SortedIndexRowKey lower;
-
-        @Nullable
-        private final SortedIndexRowKey upper;
-
-        @Nullable
-        private SortedIndexRow treeRow;
-
-        @Nullable
-        private SortedIndexRow peekedRow = NO_INDEX_ROW;
-
-        private ScanCursor(@Nullable SortedIndexRowKey lower, @Nullable 
SortedIndexRowKey upper) {
-            this.lower = lower;
-            this.upper = upper;
-        }
-
-        @Override
-        public void close() {
-            // No-op.
-        }
-
-        @Override
-        public boolean hasNext() {
-            return busy(() -> {
-                try {
-                    return advanceIfNeeded();
-                } catch (IgniteInternalCheckedException e) {
-                    throw new StorageException("Error while advancing the 
cursor", e);
-                }
-            });
-        }
-
-        @Override
-        public IndexRow next() {
-            return busy(() -> {
-                try {
-                    if (!advanceIfNeeded()) {
-                        throw new NoSuchElementException();
-                    }
-
-                    this.hasNext = null;
-
-                    return toIndexRowImpl(treeRow);
-                } catch (IgniteInternalCheckedException e) {
-                    throw new StorageException("Error while advancing the 
cursor", e);
-                }
-            });
-        }
-
-        @Override
-        public @Nullable IndexRow peek() {
-            return busy(() -> {
-                throwExceptionIfStorageInProgressOfRebalance(state.get(), 
PageMemorySortedIndexStorage.this::createStorageInfo);
-
-                try {
-                    return toIndexRowImpl(peekBusy());
-                } catch (IgniteInternalCheckedException e) {
-                    throw new StorageException("Error when peeking next 
element", e);
-                }
-            });
-        }
-
-        private @Nullable SortedIndexRow peekBusy() throws 
IgniteInternalCheckedException {
-            if (hasNext != null) {
-                return treeRow;
-            }
-
-            if (treeRow == null) {
-                peekedRow = lower == null ? sortedIndexTree.findFirst() : 
sortedIndexTree.findNext(lower, true);
-            } else {
-                peekedRow = sortedIndexTree.findNext(treeRow, false);
-            }
-
-            if (peekedRow == null || (upper != null && compareRows(peekedRow, 
upper) >= 0)) {
-                peekedRow = null;
-            }
-
-            return peekedRow;
-        }
-
-        private boolean advanceIfNeeded() throws 
IgniteInternalCheckedException {
-            throwExceptionIfStorageInProgressOfRebalance(state.get(), 
PageMemorySortedIndexStorage.this::createStorageInfo);
-
-            if (hasNext != null) {
-                return hasNext;
-            }
-
-            treeRow = (peekedRow == NO_INDEX_ROW) ? peekBusy() : peekedRow;
-            peekedRow = NO_INDEX_ROW;
-
-            hasNext = treeRow != null;
-            return hasNext;
-        }
-
-        private int compareRows(SortedIndexRowKey key1, SortedIndexRowKey 
key2) {
-            return sortedIndexTree.getBinaryTupleComparator().compare(
-                    key1.indexColumns().valueBuffer(),
-                    key2.indexColumns().valueBuffer()
-            );
-        }
-    }
-
     /**
      * Updates the internal data structures of the storage on rebalance or 
cleanup.
      *
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/SortedIndexRowKey.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/SortedIndexRowKey.java
index efecee91e5..e2be01651e 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/SortedIndexRowKey.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/SortedIndexRowKey.java
@@ -17,12 +17,13 @@
 
 package org.apache.ignite.internal.storage.pagememory.index.sorted;
 
+import org.apache.ignite.internal.storage.pagememory.index.common.IndexRowKey;
 import 
org.apache.ignite.internal.storage.pagememory.index.freelist.IndexColumns;
 
 /**
  * Key to search for a {@link SortedIndexRow} in the {@link SortedIndexTree}.
  */
-public class SortedIndexRowKey {
+public class SortedIndexRowKey implements IndexRowKey {
     private final IndexColumns indexColumns;
 
     /**
@@ -30,13 +31,11 @@ public class SortedIndexRowKey {
      *
      * @param indexColumns Index columns.
      */
-    public SortedIndexRowKey(IndexColumns indexColumns) {
+    SortedIndexRowKey(IndexColumns indexColumns) {
         this.indexColumns = indexColumns;
     }
 
-    /**
-     * Returns an indexed columns value.
-     */
+    @Override
     public IndexColumns indexColumns() {
         return indexColumns;
     }
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java
index f51148e7ce..157432fd58 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/AbstractRocksDbIndexStorage.java
@@ -17,22 +17,35 @@
 
 package org.apache.ignite.internal.storage.rocksdb.index;
 
+import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.KEY_BYTE_ORDER;
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageState;
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionDependingOnStorageStateOnRebalance;
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageInProgressOfRebalance;
+import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
+import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
 
+import java.nio.ByteBuffer;
+import java.util.NoSuchElementException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
+import org.apache.ignite.internal.rocksdb.ColumnFamily;
+import org.apache.ignite.internal.rocksdb.RocksUtils;
 import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.StorageRebalanceException;
 import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.storage.index.PeekCursor;
 import org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper;
 import org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage;
 import org.apache.ignite.internal.storage.util.StorageState;
+import org.apache.ignite.internal.util.ArrayUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.lang.IgniteStringFormatter;
 import org.jetbrains.annotations.Nullable;
+import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.Slice;
 import org.rocksdb.WriteBatch;
 import org.rocksdb.WriteBatchWithIndex;
 
@@ -201,4 +214,134 @@ abstract class AbstractRocksDbIndexStorage implements 
IndexStorage {
      * @throws RocksDBException If failed to delete data.
      */
     abstract void destroyData(WriteBatch writeBatch) throws RocksDBException;
+
+    /**
+     * Cursor that always returns up-to-date next element.
+     */
+    protected abstract class UpToDatePeekCursor<T> implements PeekCursor<T> {
+        private final Slice upperBoundSlice;
+        private final byte[] lowerBound;
+
+        private final ReadOptions options;
+        private final RocksIterator it;
+
+        private @Nullable Boolean hasNext;
+
+        /**
+         * Last key used in mapping in the {@link #next()} call.
+         * {@code null} upon cursor creation or after {@link #hasNext()} 
returned {@code null}.
+         */
+        private byte @Nullable [] key;
+
+        /**
+         * Row used in the mapping of the latest {@link #peek()} call, that 
was performed after the last {@link #next()} call.
+         * {@link ArrayUtils#BYTE_EMPTY_ARRAY} if there was no such call.
+         */
+        private byte @Nullable [] peekedKey = BYTE_EMPTY_ARRAY;
+
+        UpToDatePeekCursor(byte[] upperBound, ColumnFamily indexCf, byte[] 
lowerBound) {
+            this.lowerBound = lowerBound;
+            upperBoundSlice = new Slice(upperBound);
+            options = new ReadOptions().setIterateUpperBound(upperBoundSlice);
+            it = indexCf.newIterator(options);
+        }
+
+        /**
+         * Maps the key from the index into the required result.
+         */
+        protected abstract T map(ByteBuffer byteBuffer);
+
+        @Override
+        public void close() {
+            try {
+                closeAll(it, options, upperBoundSlice);
+            } catch (Exception e) {
+                throw new StorageException("Error closing cursor", e);
+            }
+        }
+
+        @Override
+        public boolean hasNext() {
+            return busy(this::advanceIfNeededBusy);
+        }
+
+        @Override
+        public T next() {
+            return busy(() -> {
+                if (!advanceIfNeededBusy()) {
+                    throw new NoSuchElementException();
+                }
+
+                this.hasNext = null;
+
+                return map(ByteBuffer.wrap(key).order(KEY_BYTE_ORDER));
+            });
+        }
+
+        @Override
+        public @Nullable T peek() {
+            return busy(() -> {
+                throwExceptionIfStorageInProgressOfRebalance(state.get(), 
AbstractRocksDbIndexStorage.this::createStorageInfo);
+
+                byte[] res = peekBusy();
+
+                if (res == null) {
+                    return null;
+                } else {
+                    return map(ByteBuffer.wrap(res).order(KEY_BYTE_ORDER));
+                }
+            });
+        }
+
+        private byte @Nullable [] peekBusy() {
+            if (hasNext != null) {
+                return key;
+            }
+
+            refreshAndPrepareRocksIteratorBusy();
+
+            if (!it.isValid()) {
+                RocksUtils.checkIterator(it);
+
+                peekedKey = null;
+            } else {
+                peekedKey = it.key();
+            }
+
+            return peekedKey;
+        }
+
+        private boolean advanceIfNeededBusy() throws StorageException {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), 
AbstractRocksDbIndexStorage.this::createStorageInfo);
+
+            //noinspection ArrayEquality
+            key = (peekedKey == BYTE_EMPTY_ARRAY) ? peekBusy() : peekedKey;
+            peekedKey = BYTE_EMPTY_ARRAY;
+
+            hasNext = key != null;
+            return hasNext;
+        }
+
+        private void refreshAndPrepareRocksIteratorBusy() {
+            try {
+                it.refresh();
+            } catch (RocksDBException e) {
+                throw new StorageException("Error refreshing an iterator", e);
+            }
+
+            if (key == null) {
+                it.seek(lowerBound);
+            } else {
+                it.seekForPrev(key);
+
+                if (it.isValid()) {
+                    it.next();
+                } else {
+                    RocksUtils.checkIterator(it);
+
+                    it.seek(lowerBound);
+                }
+            }
+        }
+    }
 }
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
index 8324f1779a..701c51ecb0 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbHashIndexStorage.java
@@ -24,12 +24,9 @@ import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.PAR
 import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.ROW_ID_SIZE;
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageInProgressOfRebalance;
 import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
-import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
 
 import java.nio.ByteBuffer;
 import org.apache.ignite.internal.rocksdb.ColumnFamily;
-import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
-import org.apache.ignite.internal.rocksdb.RocksUtils;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.StorageException;
@@ -40,10 +37,7 @@ import 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper;
 import org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.HashUtils;
-import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDBException;
-import org.rocksdb.RocksIterator;
-import org.rocksdb.Slice;
 import org.rocksdb.WriteBatch;
 import org.rocksdb.WriteBatchWithIndex;
 import org.rocksdb.WriteOptions;
@@ -113,48 +107,15 @@ public class RocksDbHashIndexStorage extends 
AbstractRocksDbIndexStorage impleme
 
             byte[] rangeEnd = incrementPrefix(rangeStart);
 
-            Slice upperBound = rangeEnd == null ? null : new Slice(rangeEnd);
-
-            ReadOptions options = new 
ReadOptions().setIterateUpperBound(upperBound);
-
-            RocksIterator it = indexCf.newIterator(options);
-
-            it.seek(rangeStart);
-
-            return new RocksIteratorAdapter<RowId>(it) {
+            return new UpToDatePeekCursor<RowId>(rangeEnd, indexCf, 
rangeStart) {
                 @Override
-                protected RowId decodeEntry(byte[] key, byte[] value) {
+                protected RowId map(ByteBuffer byteBuffer) {
                     // RowId UUID is located at the last 16 bytes of the key
-                    long mostSignificantBits = bytesToLong(key, key.length - 
Long.BYTES * 2);
-                    long leastSignificantBits = bytesToLong(key, key.length - 
Long.BYTES);
+                    long mostSignificantBits = 
byteBuffer.getLong(rangeStart.length);
+                    long leastSignificantBits = 
byteBuffer.getLong(rangeStart.length + Long.BYTES);
 
                     return new RowId(helper.partitionId(), 
mostSignificantBits, leastSignificantBits);
                 }
-
-                @Override
-                public boolean hasNext() {
-                    return busy(() -> {
-                        
throwExceptionIfStorageInProgressOfRebalance(state.get(), 
RocksDbHashIndexStorage.this::createStorageInfo);
-
-                        return super.hasNext();
-                    });
-                }
-
-                @Override
-                public RowId next() {
-                    return busy(() -> {
-                        
throwExceptionIfStorageInProgressOfRebalance(state.get(), 
RocksDbHashIndexStorage.this::createStorageInfo);
-
-                        return super.next();
-                    });
-                }
-
-                @Override
-                public void close() {
-                    super.close();
-
-                    RocksUtils.closeAll(options, upperBound);
-                }
             };
         });
     }
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
index 418036e6c0..6544fc270f 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
@@ -23,15 +23,12 @@ import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.PAR
 import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.ROW_ID_SIZE;
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageInProgressOfRebalance;
 import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
-import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
 
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
-import java.util.NoSuchElementException;
 import java.util.function.Function;
 import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
 import org.apache.ignite.internal.rocksdb.ColumnFamily;
-import org.apache.ignite.internal.rocksdb.RocksUtils;
 import org.apache.ignite.internal.schema.BinaryTuple;
 import org.apache.ignite.internal.schema.BinaryTuplePrefix;
 import org.apache.ignite.internal.storage.RowId;
@@ -45,10 +42,7 @@ import 
org.apache.ignite.internal.storage.rocksdb.PartitionDataHelper;
 import org.apache.ignite.internal.storage.rocksdb.RocksDbMetaStorage;
 import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
-import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDBException;
-import org.rocksdb.RocksIterator;
-import org.rocksdb.Slice;
 import org.rocksdb.WriteBatch;
 import org.rocksdb.WriteBatchWithIndex;
 
@@ -149,7 +143,7 @@ public class RocksDbSortedIndexStorage extends 
AbstractRocksDbIndexStorage imple
         });
     }
 
-    private <T> PeekCursor<T> scan(
+    protected <T> PeekCursor<T> scan(
             @Nullable BinaryTuplePrefix lowerBound,
             @Nullable BinaryTuplePrefix upperBound,
             boolean includeLower,
@@ -159,7 +153,7 @@ public class RocksDbSortedIndexStorage extends 
AbstractRocksDbIndexStorage imple
         byte[] lowerBoundBytes;
 
         if (lowerBound == null) {
-            lowerBoundBytes = null;
+            lowerBoundBytes = helper.partitionStartPrefix();
         } else {
             lowerBoundBytes = rocksPrefix(lowerBound);
 
@@ -172,7 +166,7 @@ public class RocksDbSortedIndexStorage extends 
AbstractRocksDbIndexStorage imple
         byte[] upperBoundBytes;
 
         if (upperBound == null) {
-            upperBoundBytes = null;
+            upperBoundBytes = helper.partitionEndPrefix();
         } else {
             upperBoundBytes = rocksPrefix(upperBound);
 
@@ -182,119 +176,10 @@ public class RocksDbSortedIndexStorage extends 
AbstractRocksDbIndexStorage imple
             }
         }
 
-        return createScanCursor(lowerBoundBytes, upperBoundBytes, mapper);
-    }
-
-    private <T> PeekCursor<T> createScanCursor(
-            byte @Nullable [] lowerBound,
-            byte @Nullable [] upperBound,
-            Function<ByteBuffer, T> mapper
-    ) {
-        Slice upperBoundSlice = upperBound == null ? new 
Slice(helper.partitionEndPrefix()) : new Slice(upperBound);
-
-        ReadOptions options = new 
ReadOptions().setIterateUpperBound(upperBoundSlice);
-
-        RocksIterator it = indexCf.newIterator(options);
-
-        return new PeekCursor<>() {
-            @Nullable
-            private Boolean hasNext;
-
-            private byte @Nullable [] key;
-
-            private byte @Nullable [] peekedKey = BYTE_EMPTY_ARRAY;
-
-            @Override
-            public void close() {
-                try {
-                    closeAll(it, options, upperBoundSlice);
-                } catch (Exception e) {
-                    throw new StorageException("Error closing cursor", e);
-                }
-            }
-
+        return new UpToDatePeekCursor<>(upperBoundBytes, indexCf, 
lowerBoundBytes) {
             @Override
-            public boolean hasNext() {
-                return busy(this::advanceIfNeeded);
-            }
-
-            @Override
-            public T next() {
-                return busy(() -> {
-                    if (!advanceIfNeeded()) {
-                        throw new NoSuchElementException();
-                    }
-
-                    this.hasNext = null;
-
-                    return 
mapper.apply(ByteBuffer.wrap(key).order(KEY_BYTE_ORDER));
-                });
-            }
-
-            @Override
-            public @Nullable T peek() {
-                return busy(() -> {
-                    throwExceptionIfStorageInProgressOfRebalance(state.get(), 
RocksDbSortedIndexStorage.this::createStorageInfo);
-
-                    byte[] res = peek0();
-
-                    if (res == null) {
-                        return null;
-                    } else {
-                        return 
mapper.apply(ByteBuffer.wrap(res).order(KEY_BYTE_ORDER));
-                    }
-                });
-            }
-
-            private byte @Nullable [] peek0() {
-                if (hasNext != null) {
-                    return key;
-                }
-
-                refreshAndPrepareRocksIterator();
-
-                if (!it.isValid()) {
-                    RocksUtils.checkIterator(it);
-
-                    peekedKey = null;
-                } else {
-                    peekedKey = it.key();
-                }
-
-                return peekedKey;
-            }
-
-            private boolean advanceIfNeeded() throws StorageException {
-                throwExceptionIfStorageInProgressOfRebalance(state.get(), 
RocksDbSortedIndexStorage.this::createStorageInfo);
-
-                //noinspection ArrayEquality
-                key = (peekedKey == BYTE_EMPTY_ARRAY) ? peek0() : peekedKey;
-                peekedKey = BYTE_EMPTY_ARRAY;
-
-                hasNext = key != null;
-                return hasNext;
-            }
-
-            private void refreshAndPrepareRocksIterator() {
-                try {
-                    it.refresh();
-                } catch (RocksDBException e) {
-                    throw new StorageException("Error refreshing an iterator", 
e);
-                }
-
-                if (key == null) {
-                    it.seek(lowerBound == null ? helper.partitionStartPrefix() 
: lowerBound);
-                } else {
-                    it.seekForPrev(key);
-
-                    if (it.isValid()) {
-                        it.next();
-                    } else {
-                        RocksUtils.checkIterator(it);
-
-                        it.seek(lowerBound == null ? 
helper.partitionStartPrefix() : lowerBound);
-                    }
-                }
+            protected T map(ByteBuffer byteBuffer) {
+                return mapper.apply(byteBuffer);
             }
         };
     }


Reply via email to