vldpyatkov commented on code in PR #1165:
URL: https://github.com/apache/ignite-3/pull/1165#discussion_r991111995


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -182,12 +189,112 @@ public CompletableFuture<Object> invoke(ReplicaRequest 
request) {
                         return processTxFinishAction((TxFinishReplicaRequest) 
request);
                     } else if (request instanceof TxCleanupReplicaRequest) {
                         return 
processTxCleanupAction((TxCleanupReplicaRequest) request);
+                    } else if (request instanceof 
ReadOnlySingleRowReplicaRequest) {
+                        return 
processReadOnlySingleEntryAction((ReadOnlySingleRowReplicaRequest) request);
+                    } else if (request instanceof 
ReadOnlyMultiRowReplicaRequest) {
+                        return 
processReadOnlyMultiEntryAction((ReadOnlyMultiRowReplicaRequest) request);
+                    } else if (request instanceof 
ReadOnlyScanRetrieveBatchReplicaRequest) {
+                        return 
processReadOnlyScanRetrieveBatchAction((ReadOnlyScanRetrieveBatchReplicaRequest)
 request);
                     } else {
                         throw new 
UnsupportedReplicaRequestException(request.getClass());
                     }
                 });
     }
 
+    /**
+     * Processes retrieve batch for read only transaction.
+     *
+     * @param request Read only retrieve batch request.
+     * @return Result future.
+     */
+    private CompletableFuture<Object> 
processReadOnlyScanRetrieveBatchAction(ReadOnlyScanRetrieveBatchReplicaRequest 
request) {
+        UUID txId = request.transactionId();
+        int batchCount = request.batchSize();
+
+        IgniteUuid cursorId = new IgniteUuid(txId, request.scanId());
+
+        PartitionTimestampCursor timestampCursor = mvDataStorage.scan(row -> 
true, request.timestamp());
+
+        Cursor<BinaryRow> cursor = cursors.computeIfAbsent(cursorId, id -> new 
BinaryRowCursorWrapper(timestampCursor));
+
+        ArrayList<BinaryRow> batchRows = new ArrayList<>(batchCount);
+
+        for (int i = 0; i < batchCount && cursor.hasNext(); i++) {
+            batchRows.add(cursor.next());
+        }
+
+        return CompletableFuture.completedFuture(batchRows);
+    }
+
+    /**
+     * Processes single entry request for read only transaction.
+     *
+     * @param request Read only single entry request.
+     * @return Result future.
+     */
+    private CompletableFuture<Object> 
processReadOnlySingleEntryAction(ReadOnlySingleRowReplicaRequest request) {
+        ByteBuffer searchKey = request.binaryRow().keySlice();
+
+        UUID indexId = indexIdOrDefault(indexScanId/*request.indexToUse()*/);
+
+        if (request.requestType() !=  RequestType.RO_GET) {
+            throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
+                    IgniteStringFormatter.format("Unknown single request 
[actionType={}]", request.requestType()));
+        }
+
+        //TODO: Use timestamp to find a row id.
+        RowId rowId = rowIdByKey(indexId, searchKey);
+
+        BinaryRow result = rowId != null ? 
resolveWriteIntent(mvDataStorage.read(rowId, request.timestamp())) : null;
+
+        return CompletableFuture.completedFuture(result);
+    }
+
+    /**
+     * Processes multiple entries request for read only transaction.
+     *
+     * @param request Read only multiple entries request.
+     * @return Result future.
+     */
+    private CompletableFuture<Object> 
processReadOnlyMultiEntryAction(ReadOnlyMultiRowReplicaRequest request) {
+        Collection<ByteBuffer> keyRows = request.binaryRows().stream().map(br 
-> br.keySlice()).collect(
+                Collectors.toList());
+
+        UUID indexId = indexIdOrDefault(indexScanId/*request.indexToUse()*/);
+
+        if (request.requestType() !=  RequestType.RO_GET_ALL) {
+            throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
+                    IgniteStringFormatter.format("Unknown single request 
[actionType={}]", request.requestType()));
+        }
+
+        ArrayList<BinaryRow> result = new ArrayList<>(keyRows.size());
+
+        for (ByteBuffer searchKey : keyRows) {
+            //TODO: Use timestamp to find a row id.
+            RowId rowId = rowIdByKey(indexId, searchKey);
+
+            result.add(rowId != null ? 
resolveWriteIntent(mvDataStorage.read(rowId, request.timestamp())) : null);
+        }
+
+        return CompletableFuture.completedFuture(result);
+    }
+
+    /**
+     * Resolves a read result to the matched row.

Review Comment:
   Added more description.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to