vldpyatkov commented on code in PR #1205:
URL: https://github.com/apache/ignite-3/pull/1205#discussion_r1011551341
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -569,6 +584,184 @@ private CompletableFuture<ArrayList<BinaryRow>>
processScanRetrieveBatchAction(R
});
}
+ /**
+ * Scans sorted index in RW tx.
+ *
+ * @param request Index scan request.
+ * @param indexStorage Index storage.
+ * @return Opreation future.
+ */
+ private CompletableFuture<List<BinaryRow>>
scanSortedIndex(ReadWriteScanRetrieveBatchReplicaRequest request,
+ SortedIndexStorage indexStorage) {
+ UUID txId = request.transactionId();
+ int batchCount = request.batchSize();
+
+ IgniteUuid cursorId = new IgniteUuid(txId, request.scanId());
+
+ UUID indexId = request.indexToUse();
+
+ BinaryTuple lowerBound = request.lowerBound();
+ BinaryTuple upperBound = request.upperBound();
+
+ int flags = request.flags();
+
+ boolean includeUpperBound = (flags & SortedIndexStorage.LESS_OR_EQUAL)
!= 0;
+
+ return lockManager.acquire(txId, new LockKey(indexId),
LockMode.IS).thenCompose(idxLock -> { // Index IS lock
+ return lockManager.acquire(txId, new LockKey(tableId),
LockMode.IS).thenCompose(tblLock -> { // Table IS lock
+ @SuppressWarnings("resource") Cursor<IndexRow> cursor =
(Cursor<IndexRow>) cursors.computeIfAbsent(cursorId,
+ id -> {
+ // TODO
https://issues.apache.org/jira/browse/IGNITE-18057
+ // Fix scan cursor return item closet to
lowerbound and <= lowerbound
+ // to correctly lock range between lowerbound
value and the item next to lowerbound.
+ return indexStorage.scan(
+ lowerBound == null ? null :
BinaryTuplePrefix.fromBinaryTuple(lowerBound),
+ // We need upperBound next value for
correct range lock.
+ null,
//BinaryTuplePrefix.fromBinaryTuple(upperBound),
+ flags
+ );
+ });
+
+ IndexLocker indexLocker = indexesLockers.get().get(indexId);
+
+ final ArrayList<BinaryRow> result = new
ArrayList<>(batchCount);
+
+ return continueIndexScan(txId, indexId, indexLocker, cursor,
upperBound, includeUpperBound, batchCount, result)
+ .thenApply(ignore -> result);
+ });
+ });
+ }
+
+ /**
+ * Scans sorted index in RO tx.
+ *
+ * @param request Index scan request.
+ * @param indexStorage Index storage.
+ * @return Opreation future.
+ */
+ private CompletableFuture<List<BinaryRow>> scanSortedIndex(
+ ReadOnlyScanRetrieveBatchReplicaRequest request,
+ SortedIndexStorage indexStorage
+ ) {
+ UUID txId = request.transactionId();
+ int batchCount = request.batchSize();
+ HybridTimestamp timestamp = request.readTimestamp();
+
+ IgniteUuid cursorId = new IgniteUuid(txId, request.scanId());
+
+ BinaryTuple lowerBound = request.lowerBound();
+ BinaryTuple upperBound = request.upperBound();
+
+ int flags = request.flags();
+
+ @SuppressWarnings("resource") Cursor<IndexRow> cursor =
(Cursor<IndexRow>) cursors.computeIfAbsent(cursorId,
+ id -> {
+ return indexStorage.scan(
+ lowerBound == null ? null :
BinaryTuplePrefix.fromBinaryTuple(lowerBound),
+ upperBound == null ? null :
BinaryTuplePrefix.fromBinaryTuple(upperBound),
+ flags
+ );
+ });
+
+ final ArrayList<BinaryRow> result = new ArrayList<>(batchCount);
+
+ return continueReadOnlyIndexScan(cursor, timestamp, batchCount, result)
+ .thenCompose(ignore ->
CompletableFuture.completedFuture(result));
+ }
+
+ CompletableFuture<Void> continueReadOnlyIndexScan(
+ Cursor<IndexRow> cursor,
+ HybridTimestamp timestamp,
+ int batchSize,
+ List<BinaryRow> result
+ ) {
+ if (result.size() >= batchSize || !cursor.hasNext()) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ IndexRow indexRow = cursor.next();
+
+ RowId rowId = indexRow.rowId();
+
+ ReadResult readResult = mvDataStorage.read(rowId,
HybridTimestamp.MAX_VALUE);
+
+ return resolveReadResult(readResult, timestamp, () -> {
+ if (readResult.newestCommitTimestamp() == null) {
+ return null;
+ }
+
+ ReadResult committedReadResult = mvDataStorage.read(rowId,
readResult.newestCommitTimestamp());
+
+ assert !committedReadResult.isWriteIntent() :
+ "The result is not committed [rowId=" + rowId + ",
timestamp="
+ + readResult.newestCommitTimestamp() + ']';
+
+ return committedReadResult.binaryRow();
+ }).thenCompose(resolvedReadResult -> {
+ if (resolvedReadResult != null) {
+ result.add(resolvedReadResult);
+ }
+ return continueReadOnlyIndexScan(cursor, timestamp, batchSize,
result);
Review Comment:
I do not, sure, that we have a large enough stack to scan one entry by
another. In the case when the batch size is not little.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -1357,11 +1550,8 @@ private BinaryRow resolveReadResult(ReadResult
readResult, UUID txId) {
* @param lastCommitted Action to get the latest committed row.
* @return Future to resolved binary row.
*/
- private CompletableFuture<BinaryRow> resolveReadResult(
- ReadResult readResult,
- HybridTimestamp timestamp,
- Supplier<BinaryRow> lastCommitted
- ) {
+ private CompletableFuture<BinaryRow> resolveReadResult(ReadResult
readResult, HybridTimestamp timestamp,
+ Supplier<BinaryRow> lastCommitted) {
Review Comment:
Usually, we use another format for parameters, when a string is overflowed.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -569,6 +584,184 @@ private CompletableFuture<ArrayList<BinaryRow>>
processScanRetrieveBatchAction(R
});
}
+ /**
+ * Scans sorted index in RW tx.
+ *
+ * @param request Index scan request.
+ * @param indexStorage Index storage.
+ * @return Opreation future.
+ */
+ private CompletableFuture<List<BinaryRow>>
scanSortedIndex(ReadWriteScanRetrieveBatchReplicaRequest request,
+ SortedIndexStorage indexStorage) {
+ UUID txId = request.transactionId();
+ int batchCount = request.batchSize();
+
+ IgniteUuid cursorId = new IgniteUuid(txId, request.scanId());
+
+ UUID indexId = request.indexToUse();
+
+ BinaryTuple lowerBound = request.lowerBound();
+ BinaryTuple upperBound = request.upperBound();
+
+ int flags = request.flags();
+
+ boolean includeUpperBound = (flags & SortedIndexStorage.LESS_OR_EQUAL)
!= 0;
+
+ return lockManager.acquire(txId, new LockKey(indexId),
LockMode.IS).thenCompose(idxLock -> { // Index IS lock
+ return lockManager.acquire(txId, new LockKey(tableId),
LockMode.IS).thenCompose(tblLock -> { // Table IS lock
+ @SuppressWarnings("resource") Cursor<IndexRow> cursor =
(Cursor<IndexRow>) cursors.computeIfAbsent(cursorId,
+ id -> {
+ // TODO
https://issues.apache.org/jira/browse/IGNITE-18057
+ // Fix scan cursor return item closet to
lowerbound and <= lowerbound
+ // to correctly lock range between lowerbound
value and the item next to lowerbound.
+ return indexStorage.scan(
+ lowerBound == null ? null :
BinaryTuplePrefix.fromBinaryTuple(lowerBound),
+ // We need upperBound next value for
correct range lock.
+ null,
//BinaryTuplePrefix.fromBinaryTuple(upperBound),
+ flags
+ );
+ });
+
+ IndexLocker indexLocker = indexesLockers.get().get(indexId);
+
+ final ArrayList<BinaryRow> result = new
ArrayList<>(batchCount);
+
+ return continueIndexScan(txId, indexId, indexLocker, cursor,
upperBound, includeUpperBound, batchCount, result)
+ .thenApply(ignore -> result);
+ });
+ });
+ }
+
+ /**
+ * Scans sorted index in RO tx.
+ *
+ * @param request Index scan request.
+ * @param indexStorage Index storage.
+ * @return Opreation future.
+ */
+ private CompletableFuture<List<BinaryRow>> scanSortedIndex(
+ ReadOnlyScanRetrieveBatchReplicaRequest request,
+ SortedIndexStorage indexStorage
+ ) {
+ UUID txId = request.transactionId();
+ int batchCount = request.batchSize();
+ HybridTimestamp timestamp = request.readTimestamp();
+
+ IgniteUuid cursorId = new IgniteUuid(txId, request.scanId());
+
+ BinaryTuple lowerBound = request.lowerBound();
+ BinaryTuple upperBound = request.upperBound();
+
+ int flags = request.flags();
+
+ @SuppressWarnings("resource") Cursor<IndexRow> cursor =
(Cursor<IndexRow>) cursors.computeIfAbsent(cursorId,
+ id -> {
+ return indexStorage.scan(
+ lowerBound == null ? null :
BinaryTuplePrefix.fromBinaryTuple(lowerBound),
+ upperBound == null ? null :
BinaryTuplePrefix.fromBinaryTuple(upperBound),
+ flags
+ );
+ });
+
+ final ArrayList<BinaryRow> result = new ArrayList<>(batchCount);
+
+ return continueReadOnlyIndexScan(cursor, timestamp, batchCount, result)
+ .thenCompose(ignore ->
CompletableFuture.completedFuture(result));
+ }
+
+ CompletableFuture<Void> continueReadOnlyIndexScan(
+ Cursor<IndexRow> cursor,
+ HybridTimestamp timestamp,
+ int batchSize,
+ List<BinaryRow> result
+ ) {
+ if (result.size() >= batchSize || !cursor.hasNext()) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ IndexRow indexRow = cursor.next();
+
+ RowId rowId = indexRow.rowId();
+
+ ReadResult readResult = mvDataStorage.read(rowId,
HybridTimestamp.MAX_VALUE);
+
+ return resolveReadResult(readResult, timestamp, () -> {
+ if (readResult.newestCommitTimestamp() == null) {
+ return null;
+ }
+
+ ReadResult committedReadResult = mvDataStorage.read(rowId,
readResult.newestCommitTimestamp());
+
+ assert !committedReadResult.isWriteIntent() :
+ "The result is not committed [rowId=" + rowId + ",
timestamp="
+ + readResult.newestCommitTimestamp() + ']';
+
+ return committedReadResult.binaryRow();
+ }).thenCompose(resolvedReadResult -> {
+ if (resolvedReadResult != null) {
+ result.add(resolvedReadResult);
+ }
+ return continueReadOnlyIndexScan(cursor, timestamp, batchSize,
result);
Review Comment:
Do we hope that Java optimize a tail recursion?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]