This is an automated email from the ASF dual-hosted git repository. tkalkirill pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 1a81a07e0d IGNITE-18322 Define scan contract SortedIndexStorage (#1407) 1a81a07e0d is described below commit 1a81a07e0da50a3d0ba7e1d8b3d911e106464f99 Author: Kirill Tkalenko <tkalkir...@yandex.ru> AuthorDate: Mon Dec 12 12:28:42 2022 +0300 IGNITE-18322 Define scan contract SortedIndexStorage (#1407) --- .../tree/AbstractBplusTreePageMemoryTest.java | 24 + .../ignite/internal/pagememory/tree/BplusTree.java | 75 +++ .../apache/ignite/internal/rocksdb/RocksUtils.java | 3 + .../index/AbstractSortedIndexStorageTest.java | 713 ++++++++++++++++++++- .../index/impl/BinaryTupleRowSerializer.java | 2 +- .../storage/index/impl/TestSortedIndexStorage.java | 198 ++++-- .../index/sorted/PageMemorySortedIndexStorage.java | 100 ++- .../pagememory/index/sorted/SortedIndexTree.java | 7 + .../rocksdb/index/RocksDbSortedIndexStorage.java | 79 ++- 9 files changed, 1106 insertions(+), 95 deletions(-) diff --git a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/AbstractBplusTreePageMemoryTest.java b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/AbstractBplusTreePageMemoryTest.java index b9d7c03739..0983eda760 100644 --- a/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/AbstractBplusTreePageMemoryTest.java +++ b/modules/page-memory/src/integrationTest/java/org/apache/ignite/internal/pagememory/tree/AbstractBplusTreePageMemoryTest.java @@ -2333,6 +2333,30 @@ public abstract class AbstractBplusTreePageMemoryTest extends BaseIgniteAbstract } } + @Test + void testFindNext() throws Exception { + TestTree tree = createTestTree(true); + + assertNull(tree.findNext(0L, false)); + assertNull(tree.findNext(0L, true)); + + tree.put(0L); + + assertNull(tree.findNext(0L, false)); + assertEquals(0L, tree.findNext(0L, true)); + + tree.put(1L); + + assertEquals(1L, tree.findNext(0L, false)); + assertEquals(0L, tree.findNext(0L, true)); + + assertNull(tree.findNext(1L, false)); + assertEquals(1L, tree.findNext(1L, true)); + + assertEquals(0L, tree.findNext(-1L, false)); + assertEquals(0L, tree.findNext(-1L, true)); + } + private void doTestRandomPutRemoveMultithreaded(boolean canGetRow) throws Exception { final TestTree tree = createTestTree(canGetRow); diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java index 15214e04db..c601025f84 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/tree/BplusTree.java @@ -1572,6 +1572,34 @@ public abstract class BplusTree<L, T extends L> extends DataStructure implements return findOne(row, null, null); } + /** + * Searches for the row that (strictly or loosely, depending on {@code includeRow}) follows the lowerBound passed as an argument. + * + * @param lowerBound Lower bound. + * @param includeRow {@code True} if you include the passed row in the result. + * @return Next row. + * @throws IgniteInternalCheckedException If failed. + */ + public final @Nullable T findNext(L lowerBound, boolean includeRow) throws IgniteInternalCheckedException { + checkDestroyed(); + + GetNext g = new GetNext(lowerBound, includeRow); + + try { + doFind(g); + + return g.nextRow; + } catch (CorruptedDataStructureException e) { + throw e; + } catch (IgniteInternalCheckedException e) { + throw new IgniteInternalCheckedException("Runtime failure on lookup next row: " + lowerBound, e); + } catch (RuntimeException | AssertionError e) { + throw corruptedTreeException("Runtime failure on lookup next row: " + lowerBound, e, grpId, g.pageId); + } finally { + checkDestroyed(); + } + } + /** * Tries to find. * @@ -6222,6 +6250,53 @@ public abstract class BplusTree<L, T extends L> extends DataStructure implements } } + /** + * Class for getting the next row. + */ + private final class GetNext extends Get { + @Nullable + private T nextRow; + + private GetNext(L row, boolean includeRow) { + super(row, false); + + shift = includeRow ? -1 : 1; + } + + @Override + boolean found(BplusIo<L> io, long pageAddr, int idx, int lvl) { + // Must never be called because we always have a shift. + throw new IllegalStateException(); + } + + @Override + boolean notFound(BplusIo<L> io, long pageAddr, int idx, int lvl) throws IgniteInternalCheckedException { + if (lvl != 0) { + return false; + } + + int cnt = io.getCount(pageAddr); + + if (cnt == 0) { + // Empty tree. + assert io.getForward(pageAddr, partId) == 0L; + } else { + assert io.isLeaf() : io; + assert cnt > 0 : cnt; + assert idx >= 0 : idx; + assert cnt >= idx : "cnt=" + cnt + ", idx=" + idx; + + checkDestroyed(); + + if (idx < cnt) { + nextRow = getRow(io, pageAddr, idx); + } + } + + return true; + } + } + /** * Page handler for basic {@link Get} operation. */ diff --git a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksUtils.java b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksUtils.java index 28a0a68b47..d6fa91a3f3 100644 --- a/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksUtils.java +++ b/modules/rocksdb-common/src/main/java/org/apache/ignite/internal/rocksdb/RocksUtils.java @@ -69,6 +69,9 @@ public class RocksUtils { /** * Checks the status of the iterator and throws an exception if it is not correct. * + * <p>Check the status first. This operation is guaranteed to throw if an internal error has occurred during the iteration. Otherwise, + * we've exhausted the data range. + * * @param it RocksDB iterator. * @throws IgniteInternalException if the iterator has an incorrect status. */ diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java index 3b1d5ebc01..13c2f3123d 100644 --- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java +++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractSortedIndexStorageTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.storage.index; +import static java.util.function.Function.identity; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toUnmodifiableList; import static org.apache.ignite.internal.schema.testutils.SchemaConfigurationConverter.addIndex; @@ -37,14 +38,20 @@ import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.hasSize; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.NoSuchElementException; import java.util.Random; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -227,14 +234,14 @@ public abstract class AbstractSortedIndexStorageTest { } @Test - void testEmpty() throws Exception { + void testEmpty() { SortedIndexStorage index = createIndexStorage(shuffledRandomDefinitions()); assertThat(scan(index, null, null, 0), is(empty())); } @Test - void testGet() throws Exception { + void testGet() { SortedIndexDefinition indexDefinition = SchemaBuilders.sortedIndex("TEST_INDEX") .addIndexColumn(ColumnTypeSpec.STRING.name()).asc().done() .addIndexColumn(ColumnTypeSpec.INT32.name()).asc().done() @@ -267,7 +274,7 @@ public abstract class AbstractSortedIndexStorageTest { */ @ParameterizedTest @VariableSource("ALL_TYPES_COLUMN_DEFINITIONS") - void testSingleColumnIndex(ColumnDefinition columnDefinition) throws Exception { + void testSingleColumnIndex(ColumnDefinition columnDefinition) { testPutGetRemove(List.of(columnDefinition)); } @@ -275,7 +282,7 @@ public abstract class AbstractSortedIndexStorageTest { * Tests that appending an already existing row does no harm. */ @Test - void testPutIdempotence() throws Exception { + void testPutIdempotence() { SortedIndexDefinition indexDefinition = SchemaBuilders.sortedIndex("TEST_INDEX") .addIndexColumn(ColumnTypeSpec.STRING.name()).asc().done() .addIndexColumn(ColumnTypeSpec.INT32.name()).asc().done() @@ -302,7 +309,7 @@ public abstract class AbstractSortedIndexStorageTest { * Tests that it is possible to add rows with the same columns but different Row IDs. */ @Test - void testMultiplePuts() throws Exception { + void testMultiplePuts() { SortedIndexDefinition indexDefinition = SchemaBuilders.sortedIndex("TEST_INDEX") .addIndexColumn(ColumnTypeSpec.STRING.name()).asc().done() .addIndexColumn(ColumnTypeSpec.INT32.name()).asc().done() @@ -335,7 +342,7 @@ public abstract class AbstractSortedIndexStorageTest { * Tests the {@link SortedIndexStorage#remove} method. */ @Test - void testRemove() throws Exception { + void testRemove() { SortedIndexDefinition indexDefinition = SchemaBuilders.sortedIndex("TEST_INDEX") .addIndexColumn(ColumnTypeSpec.STRING.name()).asc().done() .addIndexColumn(ColumnTypeSpec.INT32.name()).asc().done() @@ -389,7 +396,7 @@ public abstract class AbstractSortedIndexStorageTest { * Tests the Put-Get-Remove case when an index is created using all possible column in random order. */ @RepeatedTest(5) - void testCreateMultiColumnIndex() throws Exception { + void testCreateMultiColumnIndex() { testPutGetRemove(shuffledDefinitions()); } @@ -397,7 +404,7 @@ public abstract class AbstractSortedIndexStorageTest { * Tests the happy case of the {@link SortedIndexStorage#scan} method. */ @RepeatedTest(5) - void testScan() throws Exception { + void testScan() { SortedIndexStorage indexStorage = createIndexStorage(shuffledDefinitions()); List<TestIndexRow> entries = IntStream.range(0, 10) @@ -434,7 +441,7 @@ public abstract class AbstractSortedIndexStorageTest { } @Test - public void testBoundsAndOrder() throws Exception { + public void testBoundsAndOrder() { ColumnTypeSpec string = ColumnTypeSpec.STRING; ColumnTypeSpec int32 = ColumnTypeSpec.INT32; @@ -525,7 +532,7 @@ public abstract class AbstractSortedIndexStorageTest { * Tests that an empty range is returned if {@link SortedIndexStorage#scan} method is called using overlapping keys. */ @Test - void testEmptyRange() throws Exception { + void testEmptyRange() { List<ColumnDefinition> indexSchema = shuffledRandomDefinitions(); SortedIndexStorage indexStorage = createIndexStorage(indexSchema); @@ -549,7 +556,7 @@ public abstract class AbstractSortedIndexStorageTest { @ParameterizedTest @VariableSource("ALL_TYPES_COLUMN_DEFINITIONS") - void testNullValues(ColumnDefinition columnDefinition) throws Exception { + void testNullValues(ColumnDefinition columnDefinition) { SortedIndexStorage storage = createIndexStorage(List.of(columnDefinition)); TestIndexRow entry1 = TestIndexRow.randomRow(storage); @@ -579,6 +586,653 @@ public abstract class AbstractSortedIndexStorageTest { } } + /** + * Checks simple scenarios for a scanning cursor. + */ + @Test + void testScanSimple() { + SortedIndexDefinition indexDefinition = SchemaBuilders.sortedIndex("TEST_IDX") + .addIndexColumn(ColumnType.INT32.typeSpec().name()).asc().done() + .build(); + + SortedIndexStorage indexStorage = createIndexStorage(indexDefinition); + + BinaryTupleRowSerializer serializer = new BinaryTupleRowSerializer(indexStorage.indexDescriptor()); + + for (int i = 0; i < 5; i++) { + put(indexStorage, serializer.serializeRow(new Object[]{i}, new RowId(TEST_PARTITION))); + } + + // Checking without borders. + assertThat( + scan(indexStorage, null, null, 0, AbstractSortedIndexStorageTest::firstArrayElement), + contains(0, 1, 2, 3, 4) + ); + + // Let's check with borders. + assertThat( + scan( + indexStorage, + serializer.serializeRowPrefix(0), + serializer.serializeRowPrefix(4), + (GREATER_OR_EQUAL | LESS_OR_EQUAL), + AbstractSortedIndexStorageTest::firstArrayElement + ), + contains(0, 1, 2, 3, 4) + ); + + assertThat( + scan( + indexStorage, + serializer.serializeRowPrefix(0), + serializer.serializeRowPrefix(4), + (GREATER_OR_EQUAL | LESS), + AbstractSortedIndexStorageTest::firstArrayElement + ), + contains(0, 1, 2, 3) + ); + + assertThat( + scan( + indexStorage, + serializer.serializeRowPrefix(0), + serializer.serializeRowPrefix(4), + (GREATER | LESS_OR_EQUAL), + AbstractSortedIndexStorageTest::firstArrayElement + ), + contains(1, 2, 3, 4) + ); + + assertThat( + scan( + indexStorage, + serializer.serializeRowPrefix(0), + serializer.serializeRowPrefix(4), + (GREATER | LESS), + AbstractSortedIndexStorageTest::firstArrayElement + ), + contains(1, 2, 3) + ); + + // Let's check only with the lower bound. + assertThat( + scan( + indexStorage, + serializer.serializeRowPrefix(1), + null, + (GREATER_OR_EQUAL | LESS_OR_EQUAL), + AbstractSortedIndexStorageTest::firstArrayElement + ), + contains(1, 2, 3, 4) + ); + + assertThat( + scan( + indexStorage, + serializer.serializeRowPrefix(1), + null, + (GREATER_OR_EQUAL | LESS), + AbstractSortedIndexStorageTest::firstArrayElement + ), + contains(1, 2, 3, 4) + ); + + assertThat( + scan( + indexStorage, + serializer.serializeRowPrefix(1), + null, + (GREATER | LESS_OR_EQUAL), + AbstractSortedIndexStorageTest::firstArrayElement + ), + contains(2, 3, 4) + ); + + assertThat( + scan( + indexStorage, + serializer.serializeRowPrefix(1), + null, + (GREATER | LESS), + AbstractSortedIndexStorageTest::firstArrayElement + ), + contains(2, 3, 4) + ); + + // Let's check only with the upper bound. + assertThat( + scan( + indexStorage, + null, + serializer.serializeRowPrefix(3), + (GREATER_OR_EQUAL | LESS_OR_EQUAL), + AbstractSortedIndexStorageTest::firstArrayElement + ), + contains(0, 1, 2, 3) + ); + + assertThat( + scan( + indexStorage, + null, + serializer.serializeRowPrefix(3), + (GREATER_OR_EQUAL | LESS), + AbstractSortedIndexStorageTest::firstArrayElement + ), + contains(0, 1, 2) + ); + + assertThat( + scan( + indexStorage, + null, + serializer.serializeRowPrefix(3), + (GREATER | LESS_OR_EQUAL), + AbstractSortedIndexStorageTest::firstArrayElement + ), + contains(0, 1, 2, 3) + ); + + assertThat( + scan( + indexStorage, + null, + serializer.serializeRowPrefix(3), + (GREATER | LESS), + AbstractSortedIndexStorageTest::firstArrayElement + ), + contains(0, 1, 2) + ); + } + + @Test + void testScanContractAddRowBeforeInvokeHasNext() { + SortedIndexDefinition indexDefinition = SchemaBuilders.sortedIndex("TEST_IDX") + .addIndexColumn(ColumnType.INT32.typeSpec().name()).asc().done() + .build(); + + SortedIndexStorage indexStorage = createIndexStorage(indexDefinition); + + BinaryTupleRowSerializer serializer = new BinaryTupleRowSerializer(indexStorage.indexDescriptor()); + + Cursor<IndexRow> scan = indexStorage.scan(null, null, 0); + + // index = [0] + // cursor = ^ with no cached row + put(indexStorage, serializer.serializeRow(new Object[]{0}, new RowId(TEST_PARTITION))); + + // index = [0] + // cursor = ^ with cached [0] + assertTrue(scan.hasNext()); + // index = [0] + // cursor = ^ with no cached row + assertEquals(0, serializer.deserializeColumns(scan.next())[0]); + + assertFalse(scan.hasNext()); + assertThrows(NoSuchElementException.class, scan::next); + } + + @Test + void testScanContractAddRowAfterInvokeHasNext() { + SortedIndexDefinition indexDefinition = SchemaBuilders.sortedIndex("TEST_IDX") + .addIndexColumn(ColumnType.INT32.typeSpec().name()).asc().done() + .build(); + + SortedIndexStorage indexStorage = createIndexStorage(indexDefinition); + + BinaryTupleRowSerializer serializer = new BinaryTupleRowSerializer(indexStorage.indexDescriptor()); + + Cursor<IndexRow> scan = indexStorage.scan(null, null, 0); + + // index = + // cursor = ^ already finished + assertFalse(scan.hasNext()); + + // index = [0] + // cursor = ^ already finished + put(indexStorage, serializer.serializeRow(new Object[]{0}, new RowId(TEST_PARTITION))); + + // index = [0] + // cursor = ^ already finished + assertFalse(scan.hasNext()); + assertThrows(NoSuchElementException.class, scan::next); + + // index = [0] + // cursor = ^ no cached row + scan = indexStorage.scan(null, null, 0); + + // index = [0] + // cursor = ^ with cached [0] + assertTrue(scan.hasNext()); + // index = [0] + // cursor = ^ with no cached row + assertEquals(0, serializer.deserializeColumns(scan.next())[0]); + + // index = [0] [1] + // cursor = ^ with no cached row + put(indexStorage, serializer.serializeRow(new Object[]{1}, new RowId(TEST_PARTITION))); + + // index = [0] [1] + // cursor = ^ with cached [1] + assertTrue(scan.hasNext()); + // index = [0] [1] + // cursor = ^ with no cached row + assertEquals(1, serializer.deserializeColumns(scan.next())[0]); + + // index = [0] [1] + // cursor = ^ already finished + assertFalse(scan.hasNext()); + assertThrows(NoSuchElementException.class, scan::next); + } + + @Test + void testScanContractInvokeOnlyNext() { + SortedIndexDefinition indexDefinition = SchemaBuilders.sortedIndex("TEST_IDX") + .addIndexColumn(ColumnType.INT32.typeSpec().name()).asc().done() + .build(); + + SortedIndexStorage indexStorage = createIndexStorage(indexDefinition); + + BinaryTupleRowSerializer serializer = new BinaryTupleRowSerializer(indexStorage.indexDescriptor()); + + Cursor<IndexRow> scan = indexStorage.scan(null, null, 0); + + // index = + // cursor = ^ with no cached row + assertThrows(NoSuchElementException.class, scan::next); + + // index = [0] + // cursor = ^ with no cached row + put(indexStorage, serializer.serializeRow(new Object[]{0}, new RowId(TEST_PARTITION))); + + // index = [0] + // cursor = ^ already finished + assertThrows(NoSuchElementException.class, scan::next); + + // index = [0] + // cursor = ^ no cached row + scan = indexStorage.scan(null, null, 0); + + // index = [0] + // cursor = ^ no cached row + assertEquals(0, serializer.deserializeColumns(scan.next())[0]); + + assertThrows(NoSuchElementException.class, scan::next); + assertThrows(NoSuchElementException.class, scan::next); + } + + @Test + void testScanContractAddRowsOnly() { + SortedIndexDefinition indexDefinition = SchemaBuilders.sortedIndex("TEST_IDX") + .addIndexColumn(ColumnType.INT32.typeSpec().name()).asc().done() + .build(); + + SortedIndexStorage indexStorage = createIndexStorage(indexDefinition); + + BinaryTupleRowSerializer serializer = new BinaryTupleRowSerializer(indexStorage.indexDescriptor()); + + RowId rowId0 = new RowId(TEST_PARTITION, 0, 0); + RowId rowId1 = new RowId(TEST_PARTITION, 0, 1); + RowId rowId2 = new RowId(TEST_PARTITION, 1, 0); + + Cursor<IndexRow> scan = indexStorage.scan(null, null, 0); + + // index = [0, r1] + // cursor = ^ with no cached row + put(indexStorage, serializer.serializeRow(new Object[]{0}, rowId1)); + + // index = [0, r1] + // cursor = ^ with no cached row + IndexRow nextRow = scan.next(); + + assertEquals(0, serializer.deserializeColumns(nextRow)[0]); + assertEquals(rowId1, nextRow.rowId()); + + // index = [0, r0] [0, r1] [0, r2] + // cursor = ^ with no cached row + put(indexStorage, serializer.serializeRow(new Object[]{0}, rowId0)); + put(indexStorage, serializer.serializeRow(new Object[]{0}, rowId2)); + + // index = [0, r0] [0, r1] [0, r2] + // cursor = ^ with cached [0, r2] + assertTrue(scan.hasNext()); + + // index = [0, r0] [0, r1] [0, r2] + // cursor = ^ with no cached row + nextRow = scan.next(); + + assertEquals(0, serializer.deserializeColumns(nextRow)[0]); + assertEquals(rowId2, nextRow.rowId()); + + // index = [-1, r0] [0, r0] [0, r1] [0, r2] [1, r0] + // cursor = ^ with no cached row + put(indexStorage, serializer.serializeRow(new Object[]{1}, rowId0)); + put(indexStorage, serializer.serializeRow(new Object[]{-1}, rowId0)); + + // index = [-1, r0] [0, r0] [0, r1] [0, r2] [1, r0] + // cursor = ^ with cached [1, r0] + assertTrue(scan.hasNext()); + + // index = [-1, r0] [0, r0] [0, r1] [0, r2] [1, r0] + // cursor = ^ with no cached row + nextRow = scan.next(); + + assertEquals(1, serializer.deserializeColumns(nextRow)[0]); + assertEquals(rowId0, nextRow.rowId()); + + assertFalse(scan.hasNext()); + assertThrows(NoSuchElementException.class, scan::next); + } + + @Test + void testScanContractForFinishCursor() { + SortedIndexDefinition indexDefinition = SchemaBuilders.sortedIndex("TEST_IDX") + .addIndexColumn(ColumnType.INT32.typeSpec().name()).asc().done() + .build(); + + SortedIndexStorage indexStorage = createIndexStorage(indexDefinition); + + BinaryTupleRowSerializer serializer = new BinaryTupleRowSerializer(indexStorage.indexDescriptor()); + + Cursor<IndexRow> scan = indexStorage.scan(null, null, 0); + + // index = + // cursor = ^ with no cached row + assertFalse(scan.hasNext()); + assertThrows(NoSuchElementException.class, scan::next); + + // index = [0] + // cursor = ^ already finished + put(indexStorage, serializer.serializeRow(new Object[]{0}, new RowId(TEST_PARTITION, 0, 0))); + + // index = [0] + // cursor = ^ already finished + assertFalse(scan.hasNext()); + assertThrows(NoSuchElementException.class, scan::next); + + scan = indexStorage.scan(null, null, 0); + + // index = [0] + // cursor = ^ with cached [0, r0] + assertTrue(scan.hasNext()); + // index = [0] + // cursor = ^ with no cached row + assertEquals(0, serializer.deserializeColumns(scan.next())[0]); + + // index = [0] + // cursor = ^ already finished + assertFalse(scan.hasNext()); + assertThrows(NoSuchElementException.class, scan::next); + + // index = [-1] [0] + // cursor = ^ already finished + put(indexStorage, serializer.serializeRow(new Object[]{-1}, new RowId(TEST_PARTITION, 0, 0))); + + // index = [-1] [0] + // cursor = ^ already finished + assertFalse(scan.hasNext()); + assertThrows(NoSuchElementException.class, scan::next); + } + + @Test + void testScanContractNextMethodOnly() { + SortedIndexDefinition indexDefinition = SchemaBuilders.sortedIndex("TEST_IDX") + .addIndexColumn(ColumnType.INT32.typeSpec().name()).asc().done() + .build(); + + SortedIndexStorage indexStorage = createIndexStorage(indexDefinition); + + BinaryTupleRowSerializer serializer = new BinaryTupleRowSerializer(indexStorage.indexDescriptor()); + + RowId rowId0 = new RowId(TEST_PARTITION, 0, 0); + RowId rowId1 = new RowId(TEST_PARTITION, 0, 1); + RowId rowId2 = new RowId(TEST_PARTITION, 0, 1); + + Cursor<IndexRow> scan = indexStorage.scan(null, null, 0); + + put(indexStorage, serializer.serializeRow(new Object[]{0}, rowId0)); + + // index = [0, r0] + // cursor = ^ with no cached row + IndexRow nextRow = scan.next(); + + assertEquals(0, serializer.deserializeColumns(nextRow)[0]); + assertEquals(rowId0, nextRow.rowId()); + + // index = [0, r0] [0, r1] + // cursor = ^ with no cached row + put(indexStorage, serializer.serializeRow(new Object[]{0}, rowId1)); + + // index = [0, r0] [0, r1] + // cursor = ^ with no cached row + nextRow = scan.next(); + + assertEquals(0, serializer.deserializeColumns(nextRow)[0]); + assertEquals(rowId1, nextRow.rowId()); + + // index = [-1, r2] [0, r0] [0, r1] [1, r2] + // cursor = ^ with no cached row + put(indexStorage, serializer.serializeRow(new Object[]{1}, rowId2)); + put(indexStorage, serializer.serializeRow(new Object[]{-1}, rowId2)); + + // index = [-1, r2] [0, r0] [0, r1] [1, r2] + // cursor = ^ with no cached row + nextRow = scan.next(); + + assertEquals(1, serializer.deserializeColumns(nextRow)[0]); + assertEquals(rowId2, nextRow.rowId()); + + assertThrows(NoSuchElementException.class, scan::next); + } + + @Test + void testScanContractRemoveRowsOnly() { + SortedIndexDefinition indexDefinition = SchemaBuilders.sortedIndex("TEST_IDX") + .addIndexColumn(ColumnType.INT32.typeSpec().name()).asc().done() + .build(); + + SortedIndexStorage indexStorage = createIndexStorage(indexDefinition); + + BinaryTupleRowSerializer serializer = new BinaryTupleRowSerializer(indexStorage.indexDescriptor()); + + RowId rowId0 = new RowId(TEST_PARTITION, 0, 0); + RowId rowId1 = new RowId(TEST_PARTITION, 0, 1); + + put(indexStorage, serializer.serializeRow(new Object[]{0}, rowId0)); + put(indexStorage, serializer.serializeRow(new Object[]{0}, rowId1)); + put(indexStorage, serializer.serializeRow(new Object[]{1}, rowId0)); + put(indexStorage, serializer.serializeRow(new Object[]{2}, rowId1)); + + Cursor<IndexRow> scan = indexStorage.scan(null, null, 0); + + // index = [0, r0] [0, r1] [1, r0] [2, r1] + // cursor = ^ with cached [0, r0] + assertTrue(scan.hasNext()); + + // index = [0, r1] [1, r0] [2, r1] + // cursor = ^ with cached [0, r0] + remove(indexStorage, serializer.serializeRow(new Object[]{0}, rowId0)); + + // index = [0, r1] [1, r0] [2, r1] + // cursor = ^ with no cached row + IndexRow nextRow = scan.next(); + + assertEquals(0, serializer.deserializeColumns(nextRow)[0]); + assertEquals(rowId0, nextRow.rowId()); + + // index = [1, r0] [2, r1] + // cursor = ^ with no cached row + remove(indexStorage, serializer.serializeRow(new Object[]{0}, rowId1)); + + // index = [1, r0] [2, r1] + // cursor = ^ with cached [1, r0] + assertTrue(scan.hasNext()); + + // index = [1, r0] [2, r1] + // cursor = ^ with no cached row + nextRow = scan.next(); + + assertEquals(1, serializer.deserializeColumns(nextRow)[0]); + assertEquals(rowId0, nextRow.rowId()); + + // index = [1, r0] + // cursor = ^ with no cached row + remove(indexStorage, serializer.serializeRow(new Object[]{2}, rowId1)); + + assertFalse(scan.hasNext()); + assertThrows(NoSuchElementException.class, scan::next); + } + + @Test + void testScanContractReplaceRow() { + SortedIndexDefinition indexDefinition = SchemaBuilders.sortedIndex("TEST_IDX") + .addIndexColumn(ColumnType.INT32.typeSpec().name()).asc().done() + .build(); + + SortedIndexStorage indexStorage = createIndexStorage(indexDefinition); + + BinaryTupleRowSerializer serializer = new BinaryTupleRowSerializer(indexStorage.indexDescriptor()); + + RowId rowId = new RowId(TEST_PARTITION); + + put(indexStorage, serializer.serializeRow(new Object[]{0}, rowId)); + put(indexStorage, serializer.serializeRow(new Object[]{2}, rowId)); + + Cursor<IndexRow> scan = indexStorage.scan(null, null, 0); + + // index = [0] [2] + // cursor = ^ with no cached row + assertEquals(0, serializer.deserializeColumns(scan.next())[0]); + + // Replace 0 -> 1. + // index = [1] [2] + // cursor = ^ with no cached row + remove(indexStorage, serializer.serializeRow(new Object[]{0}, rowId)); + put(indexStorage, serializer.serializeRow(new Object[]{1}, rowId)); + + // index = [1] [2] + // cursor = ^ with cached [1] + assertTrue(scan.hasNext()); + // index = [1] [2] + // cursor = ^ with no cached row + assertEquals(1, serializer.deserializeColumns(scan.next())[0]); + + // index = [1] [2] + // cursor = ^ with cached [2] + assertTrue(scan.hasNext()); + // index = [1] [2] + // cursor = ^ with no cached row + assertEquals(2, serializer.deserializeColumns(scan.next())[0]); + + assertFalse(scan.hasNext()); + assertThrows(NoSuchElementException.class, scan::next); + } + + @Test + void testScanContractRemoveCachedRow() { + SortedIndexDefinition indexDefinition = SchemaBuilders.sortedIndex("TEST_IDX") + .addIndexColumn(ColumnType.INT32.typeSpec().name()).asc().done() + .build(); + + SortedIndexStorage indexStorage = createIndexStorage(indexDefinition); + + BinaryTupleRowSerializer serializer = new BinaryTupleRowSerializer(indexStorage.indexDescriptor()); + + Cursor<IndexRow> scan = indexStorage.scan(null, null, 0); + + RowId rowId = new RowId(TEST_PARTITION); + + // index = [0] [1] + // cursor = ^ with no cached row + put(indexStorage, serializer.serializeRow(new Object[]{0}, rowId)); + put(indexStorage, serializer.serializeRow(new Object[]{1}, rowId)); + + // index = [0] [1] + // cursor = ^ with cached [0] + assertTrue(scan.hasNext()); + + // index = [1] + // cursor = ^ with cached [0] + remove(indexStorage, serializer.serializeRow(new Object[]{0}, rowId)); + + // index = [1] + // cursor = ^ with no cached row + assertEquals(0, serializer.deserializeColumns(scan.next())[0]); + + // index = [1] + // cursor = ^ with cached [1] + assertTrue(scan.hasNext()); + + // index = + // cursor = ^ with cached [1] + remove(indexStorage, serializer.serializeRow(new Object[]{1}, rowId)); + + // index = + // cursor = ^ with no cached row + assertEquals(1, serializer.deserializeColumns(scan.next())[0]); + + assertFalse(scan.hasNext()); + assertThrows(NoSuchElementException.class, scan::next); + + scan = indexStorage.scan(null, null, 0); + + // index = [2] + // cursor = ^ with no cached row + put(indexStorage, serializer.serializeRow(new Object[]{2}, rowId)); + + // index = [2] + // cursor = ^ with cached [2] + assertTrue(scan.hasNext()); + + // index = + // cursor = ^ with cached [2] + remove(indexStorage, serializer.serializeRow(new Object[]{2}, rowId)); + + // index = + // cursor = ^ with no cached row + assertEquals(2, serializer.deserializeColumns(scan.next())[0]); + + assertFalse(scan.hasNext()); + assertThrows(NoSuchElementException.class, scan::next); + } + + @Test + void testScanContractRemoveNextAndAddFirstRow() { + SortedIndexDefinition indexDefinition = SchemaBuilders.sortedIndex("TEST_IDX") + .addIndexColumn(ColumnType.INT32.typeSpec().name()).asc().done() + .build(); + + SortedIndexStorage indexStorage = createIndexStorage(indexDefinition); + + BinaryTupleRowSerializer serializer = new BinaryTupleRowSerializer(indexStorage.indexDescriptor()); + + RowId rowId = new RowId(TEST_PARTITION); + + put(indexStorage, serializer.serializeRow(new Object[]{0}, rowId)); + put(indexStorage, serializer.serializeRow(new Object[]{2}, rowId)); + + Cursor<IndexRow> scan = indexStorage.scan(null, null, 0); + + // index = [0] [2] + // cursor = ^ with no cached row + assertEquals(0, serializer.deserializeColumns(scan.next())[0]); + + // index = [-1] [2] + // cursor = ^ with no cached row + remove(indexStorage, serializer.serializeRow(new Object[]{0}, rowId)); + put(indexStorage, serializer.serializeRow(new Object[]{-1}, rowId)); + + // index = [-1] [2] + // cursor = ^ with cached [2] + assertTrue(scan.hasNext()); + // index = [-1] [2] + // cursor = ^ with no cached row + assertEquals(2, serializer.deserializeColumns(scan.next())[0]); + + assertFalse(scan.hasNext()); + assertThrows(NoSuchElementException.class, scan::next); + } + private List<ColumnDefinition> shuffledRandomDefinitions() { return shuffledDefinitions(d -> random.nextBoolean()); } @@ -611,7 +1265,7 @@ public abstract class AbstractSortedIndexStorageTest { * Tests the Get-Put-Remove scenario: inserts some keys into the storage and checks that they have been successfully persisted and can * be removed. */ - private void testPutGetRemove(List<ColumnDefinition> indexSchema) throws Exception { + private void testPutGetRemove(List<ColumnDefinition> indexSchema) { SortedIndexStorage indexStorage = createIndexStorage(indexSchema); TestIndexRow entry1 = TestIndexRow.randomRow(indexStorage); @@ -644,7 +1298,7 @@ public abstract class AbstractSortedIndexStorageTest { * Extracts a single value by a given key or {@code null} if it does not exist. */ @Nullable - private static IndexRow getSingle(SortedIndexStorage indexStorage, BinaryTuple fullPrefix) throws Exception { + private static IndexRow getSingle(SortedIndexStorage indexStorage, BinaryTuple fullPrefix) { List<RowId> rowIds = get(indexStorage, fullPrefix); assertThat(rowIds, anyOf(empty(), hasSize(1))); @@ -663,17 +1317,28 @@ public abstract class AbstractSortedIndexStorageTest { @Nullable BinaryTuplePrefix lowerBound, @Nullable BinaryTuplePrefix upperBound, @MagicConstant(flagsFromClass = SortedIndexStorage.class) int flags - ) throws Exception { + ) { + return scan(index, lowerBound, upperBound, flags, identity()); + } + + private static <T> List<T> scan( + SortedIndexStorage index, + @Nullable BinaryTuplePrefix lowerBound, + @Nullable BinaryTuplePrefix upperBound, + @MagicConstant(flagsFromClass = SortedIndexStorage.class) int flags, + Function<Object[], T> mapper + ) { var serializer = new BinaryTupleRowSerializer(index.indexDescriptor()); try (Cursor<IndexRow> cursor = index.scan(lowerBound, upperBound, flags)) { return cursor.stream() .map(serializer::deserializeColumns) + .map(mapper) .collect(toUnmodifiableList()); } } - protected static List<RowId> get(SortedIndexStorage index, BinaryTuple key) throws Exception { + protected static List<RowId> get(SortedIndexStorage index, BinaryTuple key) { try (Cursor<RowId> cursor = index.get(key)) { return cursor.stream().collect(toUnmodifiableList()); } @@ -694,4 +1359,22 @@ public abstract class AbstractSortedIndexStorageTest { return null; }); } + + private static <T> List<T> getRemaining(Cursor<IndexRow> scanCursor, Function<IndexRow, T> mapper) { + List<T> result = new ArrayList<>(); + + while (scanCursor.hasNext()) { + result.add(mapper.apply(scanCursor.next())); + } + + return result; + } + + private static <T> Function<IndexRow, T> firstColumn(BinaryTupleRowSerializer serializer) { + return indexRow -> (T) serializer.deserializeColumns(indexRow)[0]; + } + + private static <T> T firstArrayElement(Object[] objects) { + return (T) objects[0]; + } } diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/BinaryTupleRowSerializer.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/BinaryTupleRowSerializer.java index 9b029d0018..65c6308656 100644 --- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/BinaryTupleRowSerializer.java +++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/BinaryTupleRowSerializer.java @@ -108,7 +108,7 @@ public class BinaryTupleRowSerializer { /** * Creates a prefix of an {@link IndexRow} using the provided columns. */ - public BinaryTuplePrefix serializeRowPrefix(Object[] prefixColumnValues) { + public BinaryTuplePrefix serializeRowPrefix(Object... prefixColumnValues) { if (prefixColumnValues.length > schema.size()) { throw new IllegalArgumentException(String.format( "Incorrect number of column values passed. Expected not more than %d, got %d", diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java index 4f7f727cb2..bef8d5846d 100644 --- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java +++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/impl/TestSortedIndexStorage.java @@ -17,14 +17,14 @@ package org.apache.ignite.internal.storage.index.impl; -import static org.apache.ignite.internal.util.IgniteUtils.capacity; +import static java.util.Collections.emptyNavigableMap; import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.HashSet; import java.util.Iterator; -import java.util.Set; -import java.util.SortedMap; +import java.util.Map.Entry; +import java.util.NavigableMap; +import java.util.NavigableSet; +import java.util.NoSuchElementException; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import org.apache.ignite.internal.binarytuple.BinaryTupleCommon; @@ -45,7 +45,13 @@ import org.jetbrains.annotations.Nullable; * Test implementation of MV sorted index storage. */ public class TestSortedIndexStorage implements SortedIndexStorage { - private final ConcurrentNavigableMap<ByteBuffer, Set<RowId>> index; + private static final Object NULL = new Object(); + + /** + * {@code NavigableMap<RowId, Object>} is used as a {@link NavigableSet}, but map was chosen because methods like + * {@link NavigableSet#first()} throw an {@link NoSuchElementException} if the set is empty. + */ + private final ConcurrentNavigableMap<ByteBuffer, NavigableMap<RowId, Object>> index; private final SortedIndexDescriptor descriptor; @@ -68,11 +74,28 @@ public class TestSortedIndexStorage implements SortedIndexStorage { public Cursor<RowId> get(BinaryTuple key) throws StorageException { checkClosed(); - Iterator<RowId> iterator = index.getOrDefault(key.byteBuffer(), Set.of()).stream() - .peek(rowId -> checkClosed()) - .iterator(); + Iterator<RowId> iterator = index.getOrDefault(key.byteBuffer(), emptyNavigableMap()).keySet().iterator(); + + return new Cursor<>() { + @Override + public void close() { + // No-op. + } + + @Override + public boolean hasNext() { + checkClosed(); + + return iterator.hasNext(); + } - return Cursor.fromBareIterator(iterator); + @Override + public RowId next() { + checkClosed(); + + return iterator.next(); + } + }; } @Override @@ -80,18 +103,11 @@ public class TestSortedIndexStorage implements SortedIndexStorage { checkClosed(); index.compute(row.indexColumns().byteBuffer(), (k, v) -> { - if (v == null) { - return Set.of(row.rowId()); - } else if (v.contains(row.rowId())) { - return v; - } else { - var result = new HashSet<RowId>(capacity(v.size() + 1)); + NavigableMap<RowId, Object> rowIds = v == null ? new ConcurrentSkipListMap<>() : v; - result.addAll(v); - result.add(row.rowId()); + rowIds.put(row.rowId(), NULL); - return result; - } + return rowIds; }); } @@ -100,19 +116,9 @@ public class TestSortedIndexStorage implements SortedIndexStorage { checkClosed(); index.computeIfPresent(row.indexColumns().byteBuffer(), (k, v) -> { - if (v.contains(row.rowId())) { - if (v.size() == 1) { - return null; - } else { - var result = new HashSet<>(v); + v.remove(row.rowId()); - result.remove(row.rowId()); - - return result; - } - } else { - return v; - } + return v.isEmpty() ? null : v; }); } @@ -135,48 +141,23 @@ public class TestSortedIndexStorage implements SortedIndexStorage { setEqualityFlag(upperBound); } - SortedMap<ByteBuffer, Set<RowId>> data; + NavigableMap<ByteBuffer, NavigableMap<RowId, Object>> navigableMap; if (lowerBound == null && upperBound == null) { - data = index; + navigableMap = index; } else if (lowerBound == null) { - data = index.headMap(upperBound.byteBuffer()); + navigableMap = index.headMap(upperBound.byteBuffer()); } else if (upperBound == null) { - data = index.tailMap(lowerBound.byteBuffer()); + navigableMap = index.tailMap(lowerBound.byteBuffer()); } else { try { - data = index.subMap(lowerBound.byteBuffer(), upperBound.byteBuffer()); + navigableMap = index.subMap(lowerBound.byteBuffer(), upperBound.byteBuffer()); } catch (IllegalArgumentException e) { - data = Collections.emptySortedMap(); + navigableMap = emptyNavigableMap(); } } - Iterator<? extends IndexRow> iterator = data.entrySet().stream() - .flatMap(e -> { - var tuple = new BinaryTuple(descriptor.binaryTupleSchema(), e.getKey()); - - return e.getValue().stream().map(rowId -> new IndexRowImpl(tuple, rowId)); - }) - .iterator(); - - return new Cursor<>() { - @Override - public void close() { - // No-op. - } - - @Override - public boolean hasNext() { - checkClosed(); - - return iterator.hasNext(); - } - - @Override - public IndexRow next() { - return iterator.next(); - } - }; + return new ScanCursor(navigableMap); } private static void setEqualityFlag(BinaryTuplePrefix prefix) { @@ -208,4 +189,93 @@ public class TestSortedIndexStorage implements SortedIndexStorage { throw new StorageClosedException("Storage is already closed"); } } + + private class ScanCursor implements Cursor<IndexRow> { + private final NavigableMap<ByteBuffer, NavigableMap<RowId, Object>> indexMap; + + @Nullable + private Boolean hasNext; + + @Nullable + private Entry<ByteBuffer, NavigableMap<RowId, Object>> indexMapEntry; + + @Nullable + private RowId rowId; + + private ScanCursor(NavigableMap<ByteBuffer, NavigableMap<RowId, Object>> indexMap) { + this.indexMap = indexMap; + } + + @Override + public void close() { + // No-op. + } + + @Override + public boolean hasNext() { + checkClosed(); + + advanceIfNeeded(); + + return hasNext; + } + + @Override + public IndexRow next() { + checkClosed(); + + advanceIfNeeded(); + + boolean hasNext = this.hasNext; + + if (!hasNext) { + throw new NoSuchElementException(); + } + + this.hasNext = null; + + return new IndexRowImpl(new BinaryTuple(descriptor.binaryTupleSchema(), indexMapEntry.getKey()), rowId); + } + + private void advanceIfNeeded() { + if (hasNext != null) { + return; + } + + if (indexMapEntry == null) { + indexMapEntry = indexMap.firstEntry(); + } + + if (rowId == null) { + if (indexMapEntry != null) { + rowId = getRowId(indexMapEntry.getValue().firstEntry()); + } + } else { + Entry<RowId, Object> nextRowIdEntry = indexMapEntry.getValue().higherEntry(rowId); + + if (nextRowIdEntry != null) { + rowId = nextRowIdEntry.getKey(); + } else { + Entry<ByteBuffer, NavigableMap<RowId, Object>> nextIndexMapEntry = indexMap.higherEntry(indexMapEntry.getKey()); + + if (nextIndexMapEntry == null) { + hasNext = false; + + return; + } else { + indexMapEntry = nextIndexMapEntry; + + rowId = getRowId(indexMapEntry.getValue().firstEntry()); + } + } + } + + hasNext = rowId != null; + } + + @Nullable + private RowId getRowId(@Nullable Entry<RowId, ?> rowIdEntry) { + return rowIdEntry == null ? null : rowIdEntry.getKey(); + } + } } diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java index 4923f005b6..b9d4beffeb 100644 --- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java +++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/PageMemorySortedIndexStorage.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.storage.pagememory.index.sorted; import java.lang.invoke.MethodHandles; import java.lang.invoke.VarHandle; import java.nio.ByteBuffer; +import java.util.NoSuchElementException; import java.util.function.Function; import org.apache.ignite.internal.binarytuple.BinaryTupleCommon; import org.apache.ignite.internal.schema.BinaryTuple; @@ -175,9 +176,7 @@ public class PageMemorySortedIndexStorage implements SortedIndexStorage { SortedIndexRowKey upper = createBound(upperBound, includeUpper); - return convertCursor(sortedIndexTree.find(lower, upper), this::toIndexRowImpl); - } catch (IgniteInternalCheckedException e) { - throw new StorageException("Failed to create scan cursor", e); + return new ScanCursor(lower, upper); } finally { closeBusyLock.leaveBusy(); } @@ -272,4 +271,99 @@ public class PageMemorySortedIndexStorage implements SortedIndexStorage { } }; } + + private class ScanCursor implements Cursor<IndexRow> { + @Nullable + private Boolean hasNext; + + @Nullable + private final SortedIndexRowKey lower; + + @Nullable + private final SortedIndexRowKey upper; + + @Nullable + private SortedIndexRow treeRow; + + private ScanCursor(@Nullable SortedIndexRowKey lower, @Nullable SortedIndexRowKey upper) { + this.lower = lower; + this.upper = upper; + } + + @Override + public void close() { + // No-op. + } + + @Override + public boolean hasNext() { + if (!closeBusyLock.enterBusy()) { + throwStorageClosedException(); + } + + try { + advanceIfNeeded(); + + return hasNext; + } catch (IgniteInternalCheckedException e) { + throw new StorageException("Error while advancing the cursor", e); + } finally { + closeBusyLock.leaveBusy(); + } + } + + @Override + public IndexRow next() { + if (!closeBusyLock.enterBusy()) { + throwStorageClosedException(); + } + + try { + advanceIfNeeded(); + + boolean hasNext = this.hasNext; + + if (!hasNext) { + throw new NoSuchElementException(); + } + + this.hasNext = null; + + return toIndexRowImpl(treeRow); + } catch (IgniteInternalCheckedException e) { + throw new StorageException("Error while advancing the cursor", e); + } finally { + closeBusyLock.leaveBusy(); + } + } + + private void advanceIfNeeded() throws IgniteInternalCheckedException { + if (hasNext != null) { + return; + } + + if (treeRow == null) { + treeRow = lower == null ? sortedIndexTree.findFirst() : sortedIndexTree.findNext(lower, true); + } else { + SortedIndexRow next = sortedIndexTree.findNext(treeRow, false); + + if (next == null) { + hasNext = false; + + return; + } else { + treeRow = next; + } + } + + hasNext = treeRow != null && (upper == null || compareRows(treeRow, upper) < 0); + } + + private int compareRows(SortedIndexRowKey key1, SortedIndexRowKey key2) { + return sortedIndexTree.getBinaryTupleComparator().compare( + key1.indexColumns().valueBuffer(), + key2.indexColumns().valueBuffer() + ); + } + } } diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/SortedIndexTree.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/SortedIndexTree.java index 2217081f09..5c21b14843 100644 --- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/SortedIndexTree.java +++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/index/sorted/SortedIndexTree.java @@ -164,4 +164,11 @@ public class SortedIndexTree extends BplusTree<SortedIndexRowKey, SortedIndexRow assert result == Boolean.TRUE : result; } + + /** + * Returns comparator of index columns {@link BinaryTuple}s. + */ + BinaryTupleComparator getBinaryTupleComparator() { + return binaryTupleComparator; + } } diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java index fb558047e3..b06cc00a74 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java @@ -19,12 +19,13 @@ package org.apache.ignite.internal.storage.rocksdb.index; import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY; import static org.apache.ignite.internal.util.CursorUtils.map; +import static org.apache.ignite.internal.util.IgniteUtils.closeAll; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.util.NoSuchElementException; import org.apache.ignite.internal.binarytuple.BinaryTupleCommon; import org.apache.ignite.internal.rocksdb.ColumnFamily; -import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter; import org.apache.ignite.internal.rocksdb.RocksUtils; import org.apache.ignite.internal.schema.BinaryTuple; import org.apache.ignite.internal.schema.BinaryTuplePrefix; @@ -167,23 +168,77 @@ public class RocksDbSortedIndexStorage implements SortedIndexStorage { RocksIterator it = indexCf.newIterator(options); - if (lowerBound == null) { - it.seek(partitionStorage.partitionStartPrefix()); - } else { - it.seek(lowerBound); - } + return new Cursor<>() { + @Nullable + private Boolean hasNext; + + private byte @Nullable [] key; - return new RocksIteratorAdapter<>(it) { @Override - protected ByteBuffer decodeEntry(byte[] key, byte[] value) { - return ByteBuffer.wrap(key).order(ORDER); + public void close() { + try { + closeAll(it, options, upperBoundSlice); + } catch (Exception e) { + throw new StorageException("Error closing cursor", e); + } } @Override - public void close() { - super.close(); + public boolean hasNext() { + advanceIfNeeded(); + + return hasNext; + } + + @Override + public ByteBuffer next() { + advanceIfNeeded(); + + boolean hasNext = this.hasNext; + + if (!hasNext) { + throw new NoSuchElementException(); + } + + this.hasNext = null; + + return ByteBuffer.wrap(key).order(ORDER); + } + + private void advanceIfNeeded() throws StorageException { + if (hasNext != null) { + return; + } + + try { + it.refresh(); + } catch (RocksDBException e) { + throw new StorageException("Error refreshing an iterator", e); + } + + if (key == null) { + it.seek(lowerBound == null ? partitionStorage.partitionStartPrefix() : lowerBound); + } else { + it.seekForPrev(key); + + if (it.isValid()) { + it.next(); + } else { + RocksUtils.checkIterator(it); + + it.seek(lowerBound == null ? partitionStorage.partitionStartPrefix() : lowerBound); + } + } + + if (!it.isValid()) { + RocksUtils.checkIterator(it); + + hasNext = false; + } else { + key = it.key(); - RocksUtils.closeAll(options, upperBoundSlice); + hasNext = true; + } } }; }