This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 49b45b03721 IGNITE-27293 Improve test coverage of InternalTable.scan
methods (#7270)
49b45b03721 is described below
commit 49b45b037212e3e6ed6816451c5d8d4cb21b02be
Author: Anton Laletin <[email protected]>
AuthorDate: Mon Dec 29 15:35:50 2025 +0400
IGNITE-27293 Improve test coverage of InternalTable.scan methods (#7270)
---
.../internal/runner/app/ItIndexNodeTest.java | 315 +++++++++++++++
.../ItInternalTableReadWriteScanTest.java | 93 -----
.../replicator/PartitionReplicaListener.java | 52 ++-
.../distributed/storage/InternalTableImpl.java | 2 -
.../distributed/storage/InternalTableImplTest.java | 440 ++++++++++++++++++++-
5 files changed, 796 insertions(+), 106 deletions(-)
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIndexNodeTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIndexNodeTest.java
new file mode 100644
index 00000000000..3584faa7de9
--- /dev/null
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIndexNodeTest.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.runner.app;
+
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static org.apache.ignite.internal.TestWrappers.unwrapTableViewInternal;
+import static
org.apache.ignite.internal.storage.index.SortedIndexStorage.GREATER_OR_EQUAL;
+import static
org.apache.ignite.internal.storage.index.SortedIndexStorage.LESS_OR_EQUAL;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
+import static
org.apache.ignite.internal.testframework.flow.TestFlowUtils.subscribeToList;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.hasSize;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Flow.Publisher;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
+import org.apache.ignite.internal.binarytuple.BinaryTupleReader;
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTuplePrefix;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.sql.SqlCommon;
+import org.apache.ignite.internal.table.IndexScanCriteria;
+import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.OperationContext;
+import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.internal.table.TxContext;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.tx.TransactionOptions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Integration tests running against a single Ignite node validating index
behavior.
+ */
+public class ItIndexNodeTest extends ClusterPerClassIntegrationTest {
+ private static final String TABLE_NAME = "IT_INDEX_TEST_TABLE";
+ private static final String HASH_IDX = "TEST_HASH_IDX";
+ private static final String SORT_IDX = "TEST_SORT_IDX";
+ private static final int LOWER_BOUND = 10;
+ private static final int UPPER_BOUND = 20;
+ private static final int VALUE = 15;
+ private static final int PART_ID = 0;
+ private static final List<Integer> ALL_VALUES_IN_RANGE = List.of(10, 11,
12, 13, 14, 15, 16, 17, 18, 19, 20);
+
+ private InternalTable internalTable;
+ private int hashIdx;
+ private int sortIdx;
+
+ TableViewInternal tableViewInternal;
+
+ @Override
+ protected int initialNodes() {
+ return 1;
+ }
+
+ @BeforeAll
+ void setUp(TestInfo testInfo) {
+ Ignite ignite = CLUSTER.node(0);
+
+ // Create zone with 1 partition.
+ String createZoneSql = "CREATE ZONE IF NOT EXISTS test_zone
(PARTITIONS 1, REPLICAS 1) STORAGE PROFILES ['default']";
+
+ String createTableSql = "CREATE TABLE " + TABLE_NAME + "(\n"
+ + " key int PRIMARY KEY,\n"
+ + " field1 int,\n"
+ + " field2 int\n"
+ + ") ZONE test_zone";
+ String createHashIndexSql = "CREATE INDEX IF NOT EXISTS " + HASH_IDX +
" ON " + TABLE_NAME + " USING HASH (field1)";
+ String createSortIndexSql = "CREATE INDEX IF NOT EXISTS " + SORT_IDX +
" ON " + TABLE_NAME + " USING SORTED (field2)";
+ String insertWithParams = "INSERT INTO " + TABLE_NAME + " (key,
field1, field2) VALUES (?, ?, ?)";
+
+ ignite.sql().execute(null, createZoneSql);
+ ignite.sql().execute(null, createTableSql);
+ ignite.sql().execute(null, createHashIndexSql);
+ ignite.sql().execute(null, createSortIndexSql);
+ for (int i = LOWER_BOUND; i <= UPPER_BOUND; i++) {
+ ignite.sql().execute(null, insertWithParams, i, i, i);
+ }
+
+ IgniteImpl igniteImpl = unwrapIgniteImpl(ignite);
+
+ CatalogManager catalogManager = igniteImpl.catalogManager();
+
+ Catalog catalog =
catalogManager.activeCatalog(igniteImpl.clock().nowLong());
+
+ hashIdx = getIndexIdBy(HASH_IDX, catalog);
+ sortIdx = getIndexIdBy(SORT_IDX, catalog);
+
+ // Get the public Table API.
+ Table table = ignite.tables().table(TABLE_NAME);
+
+ assertNotNull(table);
+
+ // Unwrap to get TableViewInternal.
+ tableViewInternal = unwrapTableViewInternal(table);
+
+ // Get the InternalTable.
+ internalTable = tableViewInternal.internalTable();
+
+ assertNotNull(internalTable);
+ }
+
+ @Test
+ void testHashIndexThrowsWithRangeScan() {
+ Publisher<BinaryRow> publisher = internalTable.scan(PART_ID, null,
hashIdx,
+ createMatchingRange());
+ CompletableFuture<List<BinaryRow>> resultFuture =
subscribeToList(publisher);
+ assertThrowsWithCause(() -> {
+ resultFuture.get(10, TimeUnit.SECONDS);
+ }, IllegalStateException.class, "Scan works only with sorted index.");
+ }
+
+ @Test
+ void testReadOnlyHashIndexThrowsWithRangeScan() {
+ Ignite ignite = CLUSTER.node(0);
+ IgniteImpl igniteImpl = unwrapIgniteImpl(ignite);
+ InternalTransaction tx = (InternalTransaction)
ignite.transactions().begin(new TransactionOptions().readOnly(true));
+ OperationContext operationContext =
OperationContext.create(TxContext.readOnly(tx));
+ Publisher<BinaryRow> publisher = internalTable.scan(PART_ID,
igniteImpl.node(), hashIdx,
+ createMatchingRange(), operationContext);
+ CompletableFuture<List<BinaryRow>> resultFuture =
subscribeToList(publisher);
+ assertThrowsWithCause(() -> {
+ resultFuture.get(10, TimeUnit.SECONDS);
+ }, IllegalStateException.class, "Scan works only with sorted index.");
+ }
+
+ @Test
+ void testHashIndexThrowsWithUnboundRangeScan() {
+ Publisher<BinaryRow> publisher = internalTable.scan(PART_ID, null,
hashIdx,
+ IndexScanCriteria.unbounded());
+ CompletableFuture<List<BinaryRow>> resultFuture =
subscribeToList(publisher);
+ assertThrowsWithCause(() -> {
+ resultFuture.get(10, TimeUnit.SECONDS);
+ }, IllegalStateException.class, "Scan works only with sorted index.");
+ }
+
+ @Test
+ void testSortWorksWithRangeScan() {
+ Publisher<BinaryRow> publisher = internalTable.scan(PART_ID, null,
sortIdx,
+ createMatchingRange());
+ CompletableFuture<List<BinaryRow>> resultFuture =
subscribeToList(publisher);
+ assertThat(resultFuture, willCompleteSuccessfully());
+ }
+
+ @Test
+ void testSortFindsResultWithUnboundRangeScan() throws Exception {
+ Publisher<BinaryRow> publisher = internalTable.scan(PART_ID, null,
sortIdx,
+ IndexScanCriteria.unbounded());
+ CompletableFuture<List<BinaryRow>> resultFuture =
subscribeToList(publisher);
+ assertThat(resultFuture, willCompleteSuccessfully());
+ List<BinaryRow> result = resultFuture.get(10, TimeUnit.SECONDS);
+ assertToBeCorrect(ALL_VALUES_IN_RANGE, result);
+ }
+
+ @Test
+ void testSortFindsResultWithBoundRangeScan() throws Exception {
+ Publisher<BinaryRow> publisher = internalTable.scan(PART_ID, null,
sortIdx,
+ createMatchingRange());
+ CompletableFuture<List<BinaryRow>> resultFuture =
subscribeToList(publisher);
+ assertThat(resultFuture, willCompleteSuccessfully());
+ List<BinaryRow> result = resultFuture.get(10, TimeUnit.SECONDS);
+ assertToBeCorrect(ALL_VALUES_IN_RANGE, result);
+ }
+
+ @Test
+ void testSortFindsNothingWithUnmatchingRangeScan() throws Exception {
+ Publisher<BinaryRow> publisher = internalTable.scan(PART_ID, null,
sortIdx,
+ createUnmatchingRange());
+ CompletableFuture<List<BinaryRow>> resultFuture =
subscribeToList(publisher);
+ assertThat(resultFuture, willCompleteSuccessfully());
+ List<BinaryRow> result = resultFuture.get(10, TimeUnit.SECONDS);
+ assertThat(result, hasSize(0));
+ }
+
+ @Test
+ void testSortFindsNothingWithUnmatchingRangeComplementScan() throws
Exception {
+ Publisher<BinaryRow> publisher = internalTable.scan(PART_ID, null,
sortIdx,
+ createUnmatchingRangeComplement());
+ CompletableFuture<List<BinaryRow>> resultFuture =
subscribeToList(publisher);
+ assertThat(resultFuture, willCompleteSuccessfully());
+ List<BinaryRow> result = resultFuture.get(10, TimeUnit.SECONDS);
+ assertThat(result, hasSize(0));
+ }
+
+ @Test
+ void testScanFailsWithNonExistentIndex() {
+ // Use a non-existent index ID.
+ int nonExistentIndexId = -1;
+ Publisher<BinaryRow> publisher = internalTable.scan(PART_ID, null,
nonExistentIndexId, IndexScanCriteria.unbounded());
+ CompletableFuture<List<BinaryRow>> resultFuture =
subscribeToList(publisher);
+ assertThrowsWithCause(() -> {
+ resultFuture.get(10, TimeUnit.SECONDS);
+ }, IllegalStateException.class, "Index not found: [id=-1].");
+ }
+
+ @Test
+ void testReadOnlyScanFailsWithNonExistentIndex() {
+ Ignite ignite = CLUSTER.node(0);
+ IgniteImpl igniteImpl = unwrapIgniteImpl(ignite);
+ // Use a non-existent index ID.
+ int nonExistentIndexId = -1;
+ InternalTransaction tx = (InternalTransaction)
ignite.transactions().begin(new TransactionOptions().readOnly(true));
+ OperationContext operationContext =
OperationContext.create(TxContext.readOnly(tx));
+ Publisher<BinaryRow> publisher = internalTable.scan(PART_ID,
igniteImpl.node(), nonExistentIndexId,
+ IndexScanCriteria.unbounded(), operationContext);
+ CompletableFuture<List<BinaryRow>> resultFuture =
subscribeToList(publisher);
+ assertThrowsWithCause(() -> {
+ resultFuture.get(10, TimeUnit.SECONDS);
+ }, IllegalStateException.class, "Index not found: [id=-1].");
+ }
+
+ @Test
+ void testHashIndexLookupExactMatch() throws Exception {
+ Ignite ignite = CLUSTER.node(0);
+ // Create a BinaryTuple with value 15 for the hash index lookup
(field1 = 15)
+ BinaryTuple lookupKey = new BinaryTuple(1, new
BinaryTupleBuilder(1).appendInt(VALUE).build());
+ IndexScanCriteria.Lookup lookupCriteria =
IndexScanCriteria.lookup(lookupKey);
+
+ InternalTransaction tx = (InternalTransaction)
ignite.transactions().begin(new TransactionOptions().readOnly(true));
+ OperationContext operationContext =
OperationContext.create(TxContext.readOnly(tx));
+
+ Publisher<BinaryRow> publisher = internalTable.scan(PART_ID,
unwrapIgniteImpl(ignite).node(), hashIdx,
+ lookupCriteria, operationContext);
+ CompletableFuture<List<BinaryRow>> resultFuture =
subscribeToList(publisher);
+
+ assertThat(resultFuture, willCompleteSuccessfully());
+ List<BinaryRow> result = resultFuture.get(10, TimeUnit.SECONDS);
+
+ assertThat(result, hasSize(1));
+ assertToBeCorrect(List.of(VALUE), result);
+ }
+
+ private int getIndexIdBy(String name, Catalog catalog) {
+ String schemaName = SqlCommon.DEFAULT_SCHEMA_NAME;
+ CatalogIndexDescriptor indexDesc = catalog.aliveIndex(schemaName,
name);
+ int res;
+ if (indexDesc != null) {
+ res = indexDesc.id();
+ } else {
+ throw new IllegalStateException();
+ }
+ return res;
+ }
+
+ private void assertToBeCorrect(List<Integer> expected, List<BinaryRow>
result) {
+ SchemaDescriptor schema =
tableViewInternal.schemaView().lastKnownSchema();
+ List<Integer> actual = result.stream().map(row -> {
+ BinaryTupleReader reader = new BinaryTupleReader(schema.length(),
row.tupleSlice());
+ // We always use key, since by test design all values are the same
among key, field1, field2
+ Column keyColumn = findColumnByName(schema, "key");
+ return reader.intValue(keyColumn.positionInRow());
+ }).collect(Collectors.toList());
+ assertThat(actual, containsInAnyOrder(expected.toArray()));
+ }
+
+ private Column findColumnByName(SchemaDescriptor schema, String
columnName) {
+ return schema.columns().stream()
+ .filter(column -> columnName.equalsIgnoreCase(column.name()))
+ .findFirst()
+ .orElseThrow(() -> new AssertionError(
+ String.format("Can't find column by name:
[columnName=%s]", columnName)
+ ));
+ }
+
+ private IndexScanCriteria.Range createMatchingRange() {
+ return IndexScanCriteria.range(createBinaryTuplePrefix(LOWER_BOUND),
+ createBinaryTuplePrefix(UPPER_BOUND), GREATER_OR_EQUAL |
LESS_OR_EQUAL);
+ }
+
+ private IndexScanCriteria.Range createUnmatchingRange() {
+ return IndexScanCriteria.range(createBinaryTuplePrefix(LOWER_BOUND *
3),
+ createBinaryTuplePrefix(UPPER_BOUND * 3), GREATER_OR_EQUAL |
LESS_OR_EQUAL);
+ }
+
+ private IndexScanCriteria.Range createUnmatchingRangeComplement() {
+ return IndexScanCriteria.range(createBinaryTuplePrefix(UPPER_BOUND),
+ createBinaryTuplePrefix(LOWER_BOUND), GREATER_OR_EQUAL |
LESS_OR_EQUAL);
+ }
+
+ private BinaryTuplePrefix createBinaryTuplePrefix(int value) {
+ BinaryTuple tuple = new BinaryTuple(1, new
BinaryTupleBuilder(1).appendInt(value).build());
+ return BinaryTuplePrefix.fromBinaryTuple(tuple);
+ }
+}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
deleted file mode 100644
index ea137024c5f..00000000000
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadWriteScanTest.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.distributed;
-
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
-import java.util.concurrent.Flow.Publisher;
-import org.apache.ignite.internal.hlc.HybridTimestampTracker;
-import org.apache.ignite.internal.network.InternalClusterNode;
-import org.apache.ignite.internal.replicator.PartitionGroupId;
-import org.apache.ignite.internal.replicator.ZonePartitionId;
-import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.table.InternalTable;
-import org.apache.ignite.internal.table.OperationContext;
-import org.apache.ignite.internal.table.RollbackTxOnErrorPublisher;
-import org.apache.ignite.internal.table.TxContext;
-import org.apache.ignite.internal.tx.InternalTransaction;
-import org.apache.ignite.internal.tx.InternalTxOptions;
-import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
-import org.jetbrains.annotations.Nullable;
-import org.junit.jupiter.api.Test;
-
-/**
- * Tests for {@link InternalTable#scan(int, InternalTransaction)}.
- */
-public class ItInternalTableReadWriteScanTest extends
ItAbstractInternalTableScanTest {
- /** Timestamp tracker. */
- private static final HybridTimestampTracker HYBRID_TIMESTAMP_TRACKER =
HybridTimestampTracker.atomicTracker(null);
-
- @Override
- protected Publisher<BinaryRow> scan(int part, @Nullable
InternalTransaction tx) {
- if (tx == null) {
- return internalTbl.scan(part, null);
- }
-
- PendingTxPartitionEnlistment enlistment = tx.enlistedPartition(new
ZonePartitionId(zoneId, part));
-
- InternalClusterNode primaryNode =
clusterNodeResolver.getByConsistentId(enlistment.primaryNodeConsistentId());
-
- TxContext txContext = TxContext.readWrite(tx,
enlistment.consistencyToken());
-
- return new RollbackTxOnErrorPublisher<>(
- tx,
- internalTbl.scan(part, primaryNode,
OperationContext.create(txContext))
- );
- }
-
- @Test
- public void testInvalidPartitionParameterScan() {
- assertThrows(
- IllegalArgumentException.class,
- () -> scan(-1, null)
- );
-
- assertThrows(
- IllegalArgumentException.class,
- () -> scan(1, null)
- );
- }
-
- @Override
- protected InternalTransaction startTx() {
- InternalTransaction tx =
internalTbl.txManager().beginExplicitRw(HYBRID_TIMESTAMP_TRACKER,
InternalTxOptions.defaults());
-
- int partId = ((PartitionGroupId) internalTbl.groupId()).partitionId();
- var zonePartitionId = new ZonePartitionId(zoneId, partId);
-
- long term = 1L;
-
- tx.assignCommitPartition(zonePartitionId);
-
- InternalClusterNode primaryReplicaNode =
getPrimaryReplica(zonePartitionId);
-
- tx.enlist(zonePartitionId, internalTbl.tableId(),
primaryReplicaNode.name(), term);
-
- return tx;
- }
-}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 6169c45def3..a5c70601d7e 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -158,6 +158,7 @@ import
org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.PartitionTimestampCursor;
import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.index.HashIndexStorage;
import org.apache.ignite.internal.storage.index.IndexRow;
import org.apache.ignite.internal.storage.index.IndexRowImpl;
import org.apache.ignite.internal.storage.index.IndexStorage;
@@ -626,9 +627,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
if (request.indexToUse() != null) {
TableSchemaAwareIndexStorage indexStorage =
secondaryIndexStorages.get().get(request.indexToUse());
- if (indexStorage == null) {
- throw new AssertionError("Index not found: uuid=" +
request.indexToUse());
- }
+ throwsIfIndexNotFound(request.indexToUse(), indexStorage);
if (request.exactKey() != null) {
assert request.lowerBoundPrefix() == null &&
request.upperBoundPrefix() == null : "Index lookup doesn't allow bounds.";
@@ -642,7 +641,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
});
}
- assert indexStorage.storage() instanceof SortedIndexStorage;
+ throwsIfIndexIsNotSorted(indexStorage);
return safeReadFuture
.thenCompose(unused -> scanSortedIndex(request,
indexStorage))
@@ -957,17 +956,14 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
if (request.indexToUse() != null) {
TableSchemaAwareIndexStorage indexStorage =
secondaryIndexStorages.get().get(request.indexToUse());
- if (indexStorage == null) {
- throw new AssertionError("Index not found: uuid=" +
request.indexToUse());
- }
+ throwsIfIndexNotFound(request.indexToUse(), indexStorage);
if (request.exactKey() != null) {
assert request.lowerBoundPrefix() == null &&
request.upperBoundPrefix() == null : "Index lookup doesn't allow bounds.";
return lookupIndex(request, indexStorage.storage(),
request.coordinatorId());
}
-
- assert indexStorage.storage() instanceof SortedIndexStorage;
+ throwsIfIndexIsNotSorted(indexStorage);
return scanSortedIndex(request, indexStorage);
}
@@ -981,6 +977,44 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
.thenCompose(tblLock ->
retrieveExactEntriesUntilCursorEmpty(txId, request.coordinatorId(), cursorId,
batchCount));
}
+ /**
+ * Validates that the index storage exists for the given index ID.
+ *
+ * <p>This method checks if the provided index storage is {@code null},
which indicates that the index
+ * with the given ID does not exist in the partition's index storage map.
+ *
+ * @param indexId Index identifier. May be {@code null}.
+ * @param indexStorage Index storage retrieved from the partition's index
storage map. May be {@code null}
+ * if the index does not exist.
+ * @throws IllegalStateException If the index storage is {@code null},
indicating the index was not found.
+ */
+ private void throwsIfIndexNotFound(@Nullable Integer indexId,
TableSchemaAwareIndexStorage indexStorage) {
+ if (indexStorage == null) {
+ throw new IllegalStateException(format("Index not found:
[id={}].", indexId));
+ }
+ }
+
+ /**
+ * Validates that the index storage is a sorted index, not a hash index.
+ *
+ * <p>This method ensures that range scan operations are only performed on
sorted indexes.
+ * Hash indexes do not support range scans because they are designed for
exact key lookups only.
+ * Range scans require ordered traversal, which is only available with
sorted indexes.
+ *
+ * <p>If the underlying storage is not a {@link SortedIndexStorage} (e.g.,
it's a {@link HashIndexStorage}),
+ * an exception is thrown to prevent invalid scan operations.
+ *
+ * @param indexStorage Index storage to validate. Must not be {@code null}
(should be validated by
+ * {@link #throwsIfIndexNotFound(Integer,
TableSchemaAwareIndexStorage)} first).
+ * @throws IllegalStateException If the index storage is not a sorted
index. The exception message
+ * indicates that scans work only with sorted indexes.
+ */
+ private void throwsIfIndexIsNotSorted(TableSchemaAwareIndexStorage
indexStorage) {
+ if (!(indexStorage.storage() instanceof SortedIndexStorage)) {
+ throw new IllegalStateException("Scan works only with sorted
index.");
+ }
+ }
+
/**
* Lookup sorted index in RO tx.
*
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index c54652d7208..520304f5be3 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -1621,8 +1621,6 @@ public class InternalTableImpl implements InternalTable {
InternalTransaction actualTx = startImplicitRwTxIfNeeded(tx);
- assert !actualTx.isReadOnly();
-
return readWriteScan(partId, actualTx, indexId, criteria);
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
index 5f7a495d6be..bd55a59e981 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
@@ -19,21 +19,31 @@ package
org.apache.ignite.internal.table.distributed.storage;
import static java.util.UUID.randomUUID;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static
org.apache.ignite.internal.storage.index.SortedIndexStorage.GREATER_OR_EQUAL;
+import static
org.apache.ignite.internal.storage.index.SortedIndexStorage.LESS_OR_EQUAL;
import static
org.apache.ignite.internal.table.distributed.storage.InternalTableImpl.collectMultiRowsResponsesWithRestoreOrder;
import static
org.apache.ignite.internal.table.distributed.storage.InternalTableImpl.collectRejectedRowsResponses;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
+import static
org.apache.ignite.internal.testframework.flow.TestFlowUtils.subscribeToList;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static
org.apache.ignite.internal.util.CompletableFutures.emptyListCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapRootCause;
+import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR;
+import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.aMapWithSize;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
@@ -41,6 +51,7 @@ import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import java.nio.ByteBuffer;
@@ -52,7 +63,10 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
@@ -64,6 +78,7 @@ import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.SingleClusterNodeResolver;
import
org.apache.ignite.internal.partition.replicator.network.replication.MultipleRowPkReplicaRequest;
import
org.apache.ignite.internal.partition.replicator.network.replication.MultipleRowReplicaRequest;
+import
org.apache.ignite.internal.partition.replicator.network.replication.ReadWriteScanRetrieveBatchReplicaRequest;
import
org.apache.ignite.internal.partition.replicator.network.replication.RequestType;
import
org.apache.ignite.internal.partition.replicator.network.replication.ScanRetrieveBatchReplicaRequest;
import
org.apache.ignite.internal.partition.replicator.network.replication.SingleRowPkReplicaRequest;
@@ -76,10 +91,13 @@ import
org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTuplePrefix;
import org.apache.ignite.internal.schema.NullBinaryRow;
import org.apache.ignite.internal.sql.SqlCommon;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.table.IndexScanCriteria;
+import org.apache.ignite.internal.table.IndexScanCriteria.Range;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.StreamerReceiverRunner;
import org.apache.ignite.internal.table.metrics.TableMetricSource;
@@ -88,16 +106,21 @@ import
org.apache.ignite.internal.testframework.SystemPropertiesExtension;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.tx.TxStateMeta;
import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl;
import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.test.TestTransactionIds;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.table.QualifiedName;
import org.apache.ignite.table.QualifiedNameHelper;
+import org.apache.ignite.tx.TransactionException;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
@@ -144,6 +167,19 @@ public class InternalTableImplTest extends
BaseIgniteAbstractTest {
lenient().when(txManager.finish(any(), any(), anyBoolean(),
anyBoolean(), anyBoolean(), any(), any()))
.thenReturn(nullCompletedFuture());
+ // Mock for creating implicit transactions when null is passed
+ lenient().when(txManager.beginImplicitRw(any())).then(invocation -> {
+ HybridTimestampTracker tracker = invocation.getArgument(0);
+ return new ReadWriteTransactionImpl(
+ txManager,
+ tracker,
+ TestTransactionIds.newTransactionId(),
+ randomUUID(),
+ true, // implicit
+ 10_000
+ );
+ });
+
lenient().when(replicaService.invoke(anyString(),
any())).then(invocation -> {
ReplicaRequest request = invocation.getArgument(1);
@@ -338,9 +374,13 @@ public class InternalTableImplTest extends
BaseIgniteAbstractTest {
}
private PendingTxPartitionEnlistment extractSingleEnlistmentForZone() {
+ return extractSingleEnlistmentForZone(1, 0);
+ }
+
+ private PendingTxPartitionEnlistment extractSingleEnlistmentForZone(int
expected, int partition) {
Map<ZonePartitionId, PendingTxPartitionEnlistment> capturedEnlistments
= extractEnlistmentsFromTxFinish();
- assertThat(capturedEnlistments, is(aMapWithSize(1)));
- PendingTxPartitionEnlistment enlistment = capturedEnlistments.get(new
ZonePartitionId(ZONE_ID, 0));
+ assertThat(capturedEnlistments, is(aMapWithSize(expected)));
+ PendingTxPartitionEnlistment enlistment = capturedEnlistments.get(new
ZonePartitionId(ZONE_ID, partition));
assertThat(enlistment, is(notNullValue()));
return enlistment;
}
@@ -364,6 +404,18 @@ public class InternalTableImplTest extends
BaseIgniteAbstractTest {
);
}
+ private InternalTransaction newReadOnlyTransaction() {
+ InternalTransaction readOnlyTx = mock(InternalTransaction.class);
+
lenient().when(readOnlyTx.id()).thenReturn(TestTransactionIds.newTransactionId());
+ lenient().when(readOnlyTx.isReadOnly()).thenReturn(true);
+ lenient().when(readOnlyTx.readTimestamp()).thenReturn(clock.now());
+ lenient().when(readOnlyTx.implicit()).thenReturn(false);
+ lenient().when(readOnlyTx.remote()).thenReturn(false);
+ lenient().when(readOnlyTx.state()).thenReturn(null);
+
lenient().when(readOnlyTx.isRolledBackWithTimeoutExceeded()).thenReturn(false);
+ return readOnlyTx;
+ }
+
private enum EnlistingOperation {
GET((table, tx) -> table.get(createBinaryRow(), tx)),
GET_ALL((table, tx) -> table.getAll(List.of(createBinaryRow()), tx)),
@@ -407,4 +459,388 @@ public class InternalTableImplTest extends
BaseIgniteAbstractTest {
return action.apply(table, transaction);
}
}
+
+ @Test
+ void testInvalidPartitionParameterScan() {
+ InternalTableImpl internalTable = newInternalTable(TABLE_ID, 1);
+
+ // Test negative partition ID
+ assertThrowsWithCause(() -> internalTable.scan(-1, null),
IllegalArgumentException.class,
+ "Invalid partition [partition=-1, minValue=0, maxValue=0].");
+
+ // Test partition ID >= number of partitions (table has 1 partition,
so partition 1 is invalid)
+ assertThrowsWithCause(() -> internalTable.scan(1, null),
IllegalArgumentException.class,
+ "Invalid partition [partition=1, minValue=0, maxValue=0].");
+ }
+
+ @Nested
+ class ScanWithIndexAndRangeCriteriaTest {
+ private static final int VALID_INDEX_ID = 1;
+ private static final int VALID_PARTITION = 0;
+ private static final int PARTITION_COUNT = 3;
+
+ private Supplier<CompletableFuture<Void>> buildAction(InternalTable
table, int partition,
+ InternalTransaction tx, int index, Range range) {
+ return () -> {
+ Publisher<BinaryRow> publisher = table.scan(partition, tx,
index, range);
+ CompletableFuture<Void> resultFuture = new
CompletableFuture<>();
+ publisher.subscribe(new BlackholeSubscriber(resultFuture));
+ return resultFuture;
+ };
+ }
+
+ private Supplier<CompletableFuture<Void>> buildAction(InternalTable
table, InternalTransaction tx, int index, Range range) {
+ return () -> {
+ Publisher<BinaryRow> publisher = table.scan(VALID_PARTITION,
tx, index, range);
+ CompletableFuture<Void> resultFuture = new
CompletableFuture<>();
+ publisher.subscribe(new BlackholeSubscriber(resultFuture));
+ return resultFuture;
+ };
+ }
+
+ private void commitTxAndAssertEnlistment(InternalTransaction tx,
InternalTableImpl internalTable) {
+ assertDoesNotThrow(tx::commit);
+ PendingTxPartitionEnlistment enlistment =
extractSingleEnlistmentForZone();
+ assertThat(enlistment.tableIds(),
contains(internalTable.tableId()));
+ }
+
+ /**
+ * Creates a BinaryTuplePrefix with a single integer value.
+ */
+ private BinaryTuplePrefix createBinaryTuplePrefix(int value) {
+ BinaryTuple tuple = new BinaryTuple(1, new
BinaryTupleBuilder(1).appendInt(value).build());
+ return BinaryTuplePrefix.fromBinaryTuple(tuple);
+ }
+
+ @Test
+ void testInvalidPartitionIdThrowsException() {
+ InternalTableImpl internalTable = newInternalTable(TABLE_ID,
PARTITION_COUNT);
+ InternalTransaction tx = newReadWriteTransaction();
+ // Test negative partition ID
+
+ assertThrowsWithCause(() -> internalTable.scan(-1, tx,
VALID_INDEX_ID, IndexScanCriteria.unbounded()),
+ IllegalArgumentException.class,
+ "Invalid partition [partition=-1, minValue=0,
maxValue=2]");
+
+ // Test partition ID >= number of partitions
+ assertThrowsWithCause(() -> internalTable.scan(PARTITION_COUNT +
1, tx, VALID_INDEX_ID, IndexScanCriteria.unbounded()),
+ IllegalArgumentException.class,
+ "Invalid partition [partition=4, minValue=0, maxValue=2]");
+
+ // Test partition ID == number of partitions (boundary)
+ assertThrowsWithCause(() -> internalTable.scan(PARTITION_COUNT,
tx, VALID_INDEX_ID, IndexScanCriteria.unbounded()),
+ IllegalArgumentException.class,
+ "Invalid partition [partition=3, minValue=0, maxValue=2]");
+ }
+
+ @Test
+ void testValidPartitionIds() {
+ InternalTableImpl internalTable = newInternalTable(TABLE_ID,
PARTITION_COUNT);
+ InternalTransaction tx = newReadWriteTransaction();
+
+ // Test valid partition IDs
+ for (int partition = 0; partition < PARTITION_COUNT; partition++) {
+ Supplier<CompletableFuture<Void>> action =
buildAction(internalTable, partition, tx,
+ VALID_INDEX_ID, IndexScanCriteria.unbounded());
+ assertThat(action.get(), willCompleteSuccessfully());
+ }
+ assertDoesNotThrow(tx::commit);
+ for (int partition = 0; partition < PARTITION_COUNT; partition++) {
+ PendingTxPartitionEnlistment enlistment =
extractSingleEnlistmentForZone(3, partition);
+ assertThat(enlistment.tableIds(),
contains(internalTable.tableId()));
+ }
+ }
+
+ @Test
+ void testNullTransactionCreatesImplicit() {
+ InternalTableImpl internalTable = newInternalTable(TABLE_ID, 1);
+
+ Supplier<CompletableFuture<Void>> action =
buildAction(internalTable, null, VALID_INDEX_ID, IndexScanCriteria.unbounded());
+ assertThat(action.get(), willCompleteSuccessfully());
+ }
+
+ @Test
+ void testUnboundedRange() {
+ InternalTableImpl internalTable = newInternalTable(TABLE_ID, 1);
+ InternalTransaction tx = newReadWriteTransaction();
+ Supplier<CompletableFuture<Void>> action =
buildAction(internalTable, tx, VALID_INDEX_ID, IndexScanCriteria.unbounded());
+ // Test unbounded range (both bounds null)
+ assertThat(action.get(), willCompleteSuccessfully());
+
+ commitTxAndAssertEnlistment(tx, internalTable);
+ }
+
+ @ParameterizedTest
+ @CsvSource({
+ "0, 0", // GREATER | LESS (both exclusive)
+ "1, 0", // GREATER_OR_EQUAL | LESS (lower inclusive,
upper exclusive)
+ "0, 2", // GREATER | LESS_OR_EQUAL (lower exclusive,
upper inclusive)
+ "1, 2" // GREATER_OR_EQUAL | LESS_OR_EQUAL (both
inclusive)
+ })
+ void testRangeWithDifferentFlags(int lowerFlag, int upperFlag) {
+ InternalTableImpl internalTable = newInternalTable(TABLE_ID, 1);
+ InternalTransaction tx = newReadWriteTransaction();
+
+ BinaryTuplePrefix lowerBound = createBinaryTuplePrefix(10);
+ BinaryTuplePrefix upperBound = createBinaryTuplePrefix(20);
+ int flags = lowerFlag | upperFlag;
+
+ IndexScanCriteria.Range criteria =
IndexScanCriteria.range(lowerBound, upperBound, flags);
+ Supplier<CompletableFuture<Void>> action =
buildAction(internalTable, tx, VALID_INDEX_ID, criteria);
+
+ assertThat(action.get(), willCompleteSuccessfully());
+
+ commitTxAndAssertEnlistment(tx, internalTable);
+ }
+
+ @Test
+ void testLowerBoundOnly() {
+ InternalTableImpl internalTable = newInternalTable(TABLE_ID, 1);
+ InternalTransaction tx = newReadWriteTransaction();
+
+ BinaryTuplePrefix lowerBound = createBinaryTuplePrefix(10);
+ IndexScanCriteria.Range criteria =
IndexScanCriteria.range(lowerBound, null, GREATER_OR_EQUAL);
+ Supplier<CompletableFuture<Void>> action =
buildAction(internalTable, tx, VALID_INDEX_ID, criteria);
+
+ assertThat(action.get(), willCompleteSuccessfully());
+
+ commitTxAndAssertEnlistment(tx, internalTable);
+ }
+
+ @Test
+ void testUpperBoundOnly() {
+ InternalTableImpl internalTable = newInternalTable(TABLE_ID, 1);
+ InternalTransaction tx = newReadWriteTransaction();
+
+ BinaryTuplePrefix upperBound = createBinaryTuplePrefix(20);
+ IndexScanCriteria.Range criteria = IndexScanCriteria.range(null,
upperBound, LESS_OR_EQUAL);
+ Supplier<CompletableFuture<Void>> action =
buildAction(internalTable, tx, VALID_INDEX_ID, criteria);
+
+ assertThat(action.get(), willCompleteSuccessfully());
+
+ commitTxAndAssertEnlistment(tx, internalTable);
+ }
+
+ @Test
+ void testBothBoundsSet() {
+ InternalTableImpl internalTable = newInternalTable(TABLE_ID, 1);
+ InternalTransaction tx = newReadWriteTransaction();
+
+ BinaryTuplePrefix lowerBound = createBinaryTuplePrefix(10);
+ BinaryTuplePrefix upperBound = createBinaryTuplePrefix(20);
+ IndexScanCriteria.Range criteria =
IndexScanCriteria.range(lowerBound, upperBound, GREATER_OR_EQUAL |
LESS_OR_EQUAL);
+ Supplier<CompletableFuture<Void>> action =
buildAction(internalTable, tx, VALID_INDEX_ID, criteria);
+
+ assertThat(action.get(), willCompleteSuccessfully());
+
+ commitTxAndAssertEnlistment(tx, internalTable);
+ }
+
+ @Test
+ void testEqualBounds() {
+ InternalTableImpl internalTable = newInternalTable(TABLE_ID, 1);
+ InternalTransaction tx = newReadWriteTransaction();
+
+ BinaryTuplePrefix bound = createBinaryTuplePrefix(10);
+ IndexScanCriteria.Range criteria = IndexScanCriteria.range(bound,
bound, GREATER_OR_EQUAL | LESS_OR_EQUAL);
+ Supplier<CompletableFuture<Void>> action =
buildAction(internalTable, tx, VALID_INDEX_ID, criteria);
+
+ assertThat(action.get(), willCompleteSuccessfully());
+
+ commitTxAndAssertEnlistment(tx, internalTable);
+ }
+
+ @Test
+ void testFlagsWithNullBounds() {
+ InternalTableImpl internalTable = newInternalTable(TABLE_ID, 1);
+ InternalTransaction tx = newReadWriteTransaction();
+
+ // Flags should be ignored when bounds are null, but should not
throw
+ IndexScanCriteria.Range criteria = IndexScanCriteria.range(null,
null, GREATER_OR_EQUAL | LESS_OR_EQUAL);
+ Supplier<CompletableFuture<Void>> action =
buildAction(internalTable, tx, VALID_INDEX_ID, criteria);
+
+ assertThat(action.get(), willCompleteSuccessfully());
+
+ commitTxAndAssertEnlistment(tx, internalTable);
+ }
+
+ @Test
+ void testZeroIndexId() {
+ InternalTableImpl internalTable = newInternalTable(TABLE_ID, 1);
+ InternalTransaction tx = newReadWriteTransaction();
+
+ // Index ID 0 might be valid (could be primary key index)
+ Supplier<CompletableFuture<Void>> action =
buildAction(internalTable, tx, 0, IndexScanCriteria.unbounded());
+
+ assertThat(action.get(), willCompleteSuccessfully());
+
+ commitTxAndAssertEnlistment(tx, internalTable);
+ }
+
+ @Test
+ void testNegativeIndexId() {
+ InternalTableImpl internalTable = newInternalTable(TABLE_ID, 1);
+ InternalTransaction tx = newReadWriteTransaction();
+
+ // Negative index ID - should be handled gracefully
+ Supplier<CompletableFuture<Void>> action =
buildAction(internalTable, tx, -1, IndexScanCriteria.unbounded());
+
+ assertThat(action.get(), willCompleteSuccessfully());
+
+ commitTxAndAssertEnlistment(tx, internalTable);
+ }
+
+ @Test
+ void testLargeIndexId() {
+ InternalTableImpl internalTable = newInternalTable(TABLE_ID, 1);
+ InternalTransaction tx = newReadWriteTransaction();
+
+ // Large index ID - should be handled gracefully
+ Supplier<CompletableFuture<Void>> action =
buildAction(internalTable, tx, Integer.MAX_VALUE,
IndexScanCriteria.unbounded());
+
+ assertThat(action.get(), willCompleteSuccessfully());
+
+ commitTxAndAssertEnlistment(tx, internalTable);
+ }
+
+ @Test
+ void testScanWithDataReturned() {
+ InternalTableImpl internalTable = newInternalTable(TABLE_ID, 1);
+ InternalTransaction tx = newReadWriteTransaction();
+
+ // Create mock binary rows to return
+ BinaryRow row1 = mock(BinaryRow.class);
+ BinaryRow row2 = mock(BinaryRow.class);
+ List<BinaryRow> scanResults = List.of(row1, row2);
+
+ // Mock replica service to return data for scan requests
+ when(replicaService.invoke(anyString(),
any(ReadWriteScanRetrieveBatchReplicaRequest.class)))
+ .thenReturn(completedFuture(scanResults));
+
+ Publisher<BinaryRow> publisher = internalTable.scan(0, tx,
VALID_INDEX_ID, IndexScanCriteria.unbounded());
+
+ CompletableFuture<List<BinaryRow>> resultFuture =
subscribeToList(publisher);
+
+ // Wait for scan to complete
+ assertThat(resultFuture, willCompleteSuccessfully());
+ List<BinaryRow> collectedRows = resultFuture.join();
+
+ // Verify that data was returned
+ assertThat(collectedRows, hasSize(2));
+
+ commitTxAndAssertEnlistment(tx, internalTable);
+ }
+
+ @Test
+ void testScanWithEmptyResult() {
+ InternalTableImpl internalTable = newInternalTable(TABLE_ID, 1);
+ InternalTransaction tx = newReadWriteTransaction();
+
+ // Mock replica service to return empty list
+ when(replicaService.invoke(anyString(),
any(ReadWriteScanRetrieveBatchReplicaRequest.class)))
+ .thenReturn((CompletableFuture)
emptyListCompletedFuture());
+
+ Publisher<BinaryRow> publisher = internalTable.scan(0, tx,
VALID_INDEX_ID, IndexScanCriteria.unbounded());
+
+ CompletableFuture<List<BinaryRow>> resultFuture =
subscribeToList(publisher);
+
+ // Wait for scan to complete
+ assertThat(resultFuture, willCompleteSuccessfully());
+ List<BinaryRow> collectedRows = resultFuture.join();
+
+ // Verify that empty result was returned
+ assertThat(collectedRows, hasSize(0));
+
+ commitTxAndAssertEnlistment(tx, internalTable);
+ }
+
+ @Test
+ void testScanWithCommittedTransactionThrowsException() {
+ InternalTableImpl internalTable = newInternalTable(TABLE_ID, 1);
+ InternalTransaction tx = newReadWriteTransaction();
+
+ // Mock transaction manager to mark transaction as committed
+ UUID txId = tx.id();
+ when(txManager.stateMeta(txId)).thenReturn(new TxStateMeta(
+ TxState.COMMITTED,
+ txId,
+ null,
+ null,
+ null,
+ null
+ ));
+
+ // Commit the transaction
+ assertDoesNotThrow(tx::commit);
+
+ // Try to scan with committed transaction - should throw
TransactionException
+ Publisher<BinaryRow> publisher =
internalTable.scan(VALID_PARTITION, tx, VALID_INDEX_ID,
IndexScanCriteria.unbounded());
+
+ CompletableFuture<Void> completed = new CompletableFuture<>();
+
+ publisher.subscribe(new BlackholeSubscriber(completed));
+
+ // Wait for error
+ try {
+ completed.get(10, TimeUnit.SECONDS);
+ fail("Expected TransactionException but scan completed
successfully");
+ } catch (Exception e) {
+ Throwable cause = unwrapRootCause(e);
+ assertThat("Error should be TransactionException", cause,
is(instanceOf(TransactionException.class)));
+ TransactionException txEx = (TransactionException) cause;
+ assertThat("Error code should be TX_ALREADY_FINISHED_ERR",
txEx.code(), is(TX_ALREADY_FINISHED_ERR));
+ }
+ }
+
+ @Test
+ void testScanWithAbortedTransactionThrowsException() {
+ InternalTableImpl internalTable = newInternalTable(TABLE_ID, 1);
+ InternalTransaction tx = newReadWriteTransaction();
+
+ // Mock transaction manager to mark transaction as aborted
+ UUID txId = tx.id();
+ when(txManager.stateMeta(txId)).thenReturn(new TxStateMeta(
+ TxState.ABORTED,
+ txId,
+ null,
+ null,
+ null,
+ null
+ ));
+
+ // Rollback the transaction
+ assertDoesNotThrow(tx::rollback);
+
+ // Try to scan with aborted transaction - should throw
TransactionException
+ Publisher<BinaryRow> publisher =
internalTable.scan(VALID_PARTITION, tx, VALID_INDEX_ID,
IndexScanCriteria.unbounded());
+
+ CompletableFuture<Void> completed = new CompletableFuture<>();
+ publisher.subscribe(new BlackholeSubscriber(completed));
+
+ // Wait for error
+ try {
+ completed.get(10, TimeUnit.SECONDS);
+ fail("Expected TransactionException but scan completed
successfully");
+ } catch (Exception e) {
+ Throwable cause = unwrapRootCause(e);
+ assertThat("Error should be TransactionException", cause,
is(instanceOf(TransactionException.class)));
+ TransactionException txEx = (TransactionException) cause;
+ assertThat("Error code should be TX_ALREADY_FINISHED_ERR",
txEx.code(), is(TX_ALREADY_FINISHED_ERR));
+ }
+ }
+
+ @Test
+ void testScanWithReadOnlyTransactionThrowsException() {
+ InternalTableImpl internalTable = newInternalTable(TABLE_ID, 1);
+ InternalTransaction readOnlyTx = newReadOnlyTransaction();
+
+ try {
+ internalTable.scan(VALID_PARTITION, readOnlyTx,
VALID_INDEX_ID, IndexScanCriteria.unbounded());
+ fail("Expected exception when scanning with read-only
transaction");
+ } catch (TransactionException e) {
+ assertThat("Error code should be
TX_FAILED_READ_WRITE_OPERATION_ERR",
+ e.code(), is(TX_FAILED_READ_WRITE_OPERATION_ERR));
+ }
+ }
+ }
}