vldpyatkov commented on code in PR #1165: URL: https://github.com/apache/ignite-3/pull/1165#discussion_r991111995
########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java: ########## @@ -182,12 +189,112 @@ public CompletableFuture<Object> invoke(ReplicaRequest request) { return processTxFinishAction((TxFinishReplicaRequest) request); } else if (request instanceof TxCleanupReplicaRequest) { return processTxCleanupAction((TxCleanupReplicaRequest) request); + } else if (request instanceof ReadOnlySingleRowReplicaRequest) { + return processReadOnlySingleEntryAction((ReadOnlySingleRowReplicaRequest) request); + } else if (request instanceof ReadOnlyMultiRowReplicaRequest) { + return processReadOnlyMultiEntryAction((ReadOnlyMultiRowReplicaRequest) request); + } else if (request instanceof ReadOnlyScanRetrieveBatchReplicaRequest) { + return processReadOnlyScanRetrieveBatchAction((ReadOnlyScanRetrieveBatchReplicaRequest) request); } else { throw new UnsupportedReplicaRequestException(request.getClass()); } }); } + /** + * Processes retrieve batch for read only transaction. + * + * @param request Read only retrieve batch request. + * @return Result future. + */ + private CompletableFuture<Object> processReadOnlyScanRetrieveBatchAction(ReadOnlyScanRetrieveBatchReplicaRequest request) { + UUID txId = request.transactionId(); + int batchCount = request.batchSize(); + + IgniteUuid cursorId = new IgniteUuid(txId, request.scanId()); + + PartitionTimestampCursor timestampCursor = mvDataStorage.scan(row -> true, request.timestamp()); + + Cursor<BinaryRow> cursor = cursors.computeIfAbsent(cursorId, id -> new BinaryRowCursorWrapper(timestampCursor)); + + ArrayList<BinaryRow> batchRows = new ArrayList<>(batchCount); + + for (int i = 0; i < batchCount && cursor.hasNext(); i++) { + batchRows.add(cursor.next()); + } + + return CompletableFuture.completedFuture(batchRows); + } + + /** + * Processes single entry request for read only transaction. + * + * @param request Read only single entry request. + * @return Result future. + */ + private CompletableFuture<Object> processReadOnlySingleEntryAction(ReadOnlySingleRowReplicaRequest request) { + ByteBuffer searchKey = request.binaryRow().keySlice(); + + UUID indexId = indexIdOrDefault(indexScanId/*request.indexToUse()*/); + + if (request.requestType() != RequestType.RO_GET) { + throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR, + IgniteStringFormatter.format("Unknown single request [actionType={}]", request.requestType())); + } + + //TODO: Use timestamp to find a row id. + RowId rowId = rowIdByKey(indexId, searchKey); + + BinaryRow result = rowId != null ? resolveWriteIntent(mvDataStorage.read(rowId, request.timestamp())) : null; + + return CompletableFuture.completedFuture(result); + } + + /** + * Processes multiple entries request for read only transaction. + * + * @param request Read only multiple entries request. + * @return Result future. + */ + private CompletableFuture<Object> processReadOnlyMultiEntryAction(ReadOnlyMultiRowReplicaRequest request) { + Collection<ByteBuffer> keyRows = request.binaryRows().stream().map(br -> br.keySlice()).collect( + Collectors.toList()); + + UUID indexId = indexIdOrDefault(indexScanId/*request.indexToUse()*/); + + if (request.requestType() != RequestType.RO_GET_ALL) { + throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR, + IgniteStringFormatter.format("Unknown single request [actionType={}]", request.requestType())); + } + + ArrayList<BinaryRow> result = new ArrayList<>(keyRows.size()); + + for (ByteBuffer searchKey : keyRows) { + //TODO: Use timestamp to find a row id. + RowId rowId = rowIdByKey(indexId, searchKey); + + result.add(rowId != null ? resolveWriteIntent(mvDataStorage.read(rowId, request.timestamp())) : null); + } + + return CompletableFuture.completedFuture(result); + } + + /** + * Resolves a read result to the matched row. Review Comment: Added more description. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org