sanpwc commented on code in PR #1165:
URL: https://github.com/apache/ignite-3/pull/1165#discussion_r989822321


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/ScanRetrieveBatchReplicaRequest.java:
##########
@@ -33,11 +34,52 @@ public interface ScanRetrieveBatchReplicaRequest extends 
ReplicaRequest {
     long scanId();
 
     /**
-     * Gets a scan row filter. The filter has a sense only for the first 
request, for the second one and the followings the field is
-     * ignored.
+     * Gets an index to use fot the retrieve request.
      *
-     * @return Row filter predicate.
+     * @return Index id.
      */
     @Marshallable
-    Predicate<BinaryRow> rowFilter();
+    UUID indexToUse();
+
+    /**
+     * Gets a key which is used for exact comparison in the index.

Review Comment:
   Please reuse javadocs from 
org.apache.ignite.internal.storage.index.SortedIndexStorage wherever possible.
   
   ```
        * @param lowerBound Lower bound. Exclusivity is controlled by a {@link 
#GREATER_OR_EQUAL} or {@link #GREATER} flag.
        *      {@code null} means unbounded.
        * @param upperBound Upper bound. Exclusivity is controlled by a {@link 
#LESS} or {@link #LESS_OR_EQUAL} flag.
        *      {@code null} means unbounded.
        * @param flags Control flags. {@link #GREATER} | {@link #LESS} by 
default. Other available values
        *      are {@link #GREATER_OR_EQUAL}, {@link #LESS_OR_EQUAL}.
   
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -182,12 +189,112 @@ public CompletableFuture<Object> invoke(ReplicaRequest 
request) {
                         return processTxFinishAction((TxFinishReplicaRequest) 
request);
                     } else if (request instanceof TxCleanupReplicaRequest) {
                         return 
processTxCleanupAction((TxCleanupReplicaRequest) request);
+                    } else if (request instanceof 
ReadOnlySingleRowReplicaRequest) {
+                        return 
processReadOnlySingleEntryAction((ReadOnlySingleRowReplicaRequest) request);
+                    } else if (request instanceof 
ReadOnlyMultiRowReplicaRequest) {
+                        return 
processReadOnlyMultiEntryAction((ReadOnlyMultiRowReplicaRequest) request);
+                    } else if (request instanceof 
ReadOnlyScanRetrieveBatchReplicaRequest) {
+                        return 
processReadOnlyScanRetrieveBatchAction((ReadOnlyScanRetrieveBatchReplicaRequest)
 request);
                     } else {
                         throw new 
UnsupportedReplicaRequestException(request.getClass());
                     }
                 });
     }
 
+    /**
+     * Processes retrieve batch for read only transaction.
+     *
+     * @param request Read only retrieve batch request.
+     * @return Result future.
+     */
+    private CompletableFuture<Object> 
processReadOnlyScanRetrieveBatchAction(ReadOnlyScanRetrieveBatchReplicaRequest 
request) {
+        UUID txId = request.transactionId();
+        int batchCount = request.batchSize();
+
+        IgniteUuid cursorId = new IgniteUuid(txId, request.scanId());
+
+        PartitionTimestampCursor timestampCursor = mvDataStorage.scan(row -> 
true, request.timestamp());

Review Comment:
   Please add todo for Predicate removal 
https://issues.apache.org/jira/browse/IGNITE-17849, both here and in 
processScanRetrieveBatchAction



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1665,15 +1665,15 @@ public boolean onUpdate(@NotNull WatchEvent evt) {
                         return true;
                     }
 
-                    int part = 
extractPartitionNumber(pendingAssignmentsWatchEvent.key());
+                    int partId = 
extractPartitionNumber(pendingAssignmentsWatchEvent.key());

Review Comment:
   Let it be partIdx, what do u think?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -1051,4 +1158,62 @@ private CompletableFuture<Void> 
ensureReplicaIsPrimary(ReplicaRequest request) {
             return CompletableFuture.completedFuture(null);
         }
     }
+
+    /**
+     * Cursor wrapper to resolve a write intent, which may be required for 
scan by timestamp.
+     */
+    private class BinaryRowCursorWrapper implements Cursor<BinaryRow> {

Review Comment:
   Why we need it?
   
   why not to use
   ```
           for (int i = 0; i < batchCount && cursor.hasNext(); i++) {
               batchRows.add(resolveWriteIntent(cursor.next()));
           }
   ```
   Where cursor will be common PartitionTimestampCursor?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -182,12 +189,112 @@ public CompletableFuture<Object> invoke(ReplicaRequest 
request) {
                         return processTxFinishAction((TxFinishReplicaRequest) 
request);
                     } else if (request instanceof TxCleanupReplicaRequest) {
                         return 
processTxCleanupAction((TxCleanupReplicaRequest) request);
+                    } else if (request instanceof 
ReadOnlySingleRowReplicaRequest) {
+                        return 
processReadOnlySingleEntryAction((ReadOnlySingleRowReplicaRequest) request);
+                    } else if (request instanceof 
ReadOnlyMultiRowReplicaRequest) {
+                        return 
processReadOnlyMultiEntryAction((ReadOnlyMultiRowReplicaRequest) request);
+                    } else if (request instanceof 
ReadOnlyScanRetrieveBatchReplicaRequest) {
+                        return 
processReadOnlyScanRetrieveBatchAction((ReadOnlyScanRetrieveBatchReplicaRequest)
 request);
                     } else {
                         throw new 
UnsupportedReplicaRequestException(request.getClass());
                     }
                 });
     }
 
+    /**
+     * Processes retrieve batch for read only transaction.
+     *
+     * @param request Read only retrieve batch request.
+     * @return Result future.
+     */
+    private CompletableFuture<Object> 
processReadOnlyScanRetrieveBatchAction(ReadOnlyScanRetrieveBatchReplicaRequest 
request) {
+        UUID txId = request.transactionId();
+        int batchCount = request.batchSize();
+
+        IgniteUuid cursorId = new IgniteUuid(txId, request.scanId());
+
+        PartitionTimestampCursor timestampCursor = mvDataStorage.scan(row -> 
true, request.timestamp());
+
+        Cursor<BinaryRow> cursor = cursors.computeIfAbsent(cursorId, id -> new 
BinaryRowCursorWrapper(timestampCursor));
+
+        ArrayList<BinaryRow> batchRows = new ArrayList<>(batchCount);
+
+        for (int i = 0; i < batchCount && cursor.hasNext(); i++) {
+            batchRows.add(cursor.next());
+        }
+
+        return CompletableFuture.completedFuture(batchRows);
+    }
+
+    /**
+     * Processes single entry request for read only transaction.
+     *
+     * @param request Read only single entry request.
+     * @return Result future.
+     */
+    private CompletableFuture<Object> 
processReadOnlySingleEntryAction(ReadOnlySingleRowReplicaRequest request) {
+        ByteBuffer searchKey = request.binaryRow().keySlice();
+
+        UUID indexId = indexIdOrDefault(indexScanId/*request.indexToUse()*/);
+
+        if (request.requestType() !=  RequestType.RO_GET) {
+            throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
+                    IgniteStringFormatter.format("Unknown single request 
[actionType={}]", request.requestType()));
+        }
+
+        //TODO: Use timestamp to find a row id.
+        RowId rowId = rowIdByKey(indexId, searchKey);
+
+        BinaryRow result = rowId != null ? 
resolveWriteIntent(mvDataStorage.read(rowId, request.timestamp())) : null;
+
+        return CompletableFuture.completedFuture(result);
+    }
+
+    /**
+     * Processes multiple entries request for read only transaction.
+     *
+     * @param request Read only multiple entries request.
+     * @return Result future.
+     */
+    private CompletableFuture<Object> 
processReadOnlyMultiEntryAction(ReadOnlyMultiRowReplicaRequest request) {
+        Collection<ByteBuffer> keyRows = request.binaryRows().stream().map(br 
-> br.keySlice()).collect(
+                Collectors.toList());
+
+        UUID indexId = indexIdOrDefault(indexScanId/*request.indexToUse()*/);
+
+        if (request.requestType() !=  RequestType.RO_GET_ALL) {
+            throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
+                    IgniteStringFormatter.format("Unknown single request 
[actionType={}]", request.requestType()));
+        }
+
+        ArrayList<BinaryRow> result = new ArrayList<>(keyRows.size());
+
+        for (ByteBuffer searchKey : keyRows) {
+            //TODO: Use timestamp to find a row id.
+            RowId rowId = rowIdByKey(indexId, searchKey);
+
+            result.add(rowId != null ? 
resolveWriteIntent(mvDataStorage.read(rowId, request.timestamp())) : null);
+        }
+
+        return CompletableFuture.completedFuture(result);
+    }
+
+    /**
+     * Resolves a read result to the matched row.
+     * If the result dos not math any row, the method return {@code null}.
+     * TODO: Use a write intent resolution procedure to get a row form {@link 
ReadResult}.
+     *
+     * @param readResult Read result.
+     * @return {@link BinaryRow} or {@code null}.
+     */
+    private BinaryRow resolveWriteIntent(ReadResult readResult) {
+        if (readResult == null) {

Review Comment:
   Why do we expect null readResult here?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -182,12 +189,112 @@ public CompletableFuture<Object> invoke(ReplicaRequest 
request) {
                         return processTxFinishAction((TxFinishReplicaRequest) 
request);
                     } else if (request instanceof TxCleanupReplicaRequest) {
                         return 
processTxCleanupAction((TxCleanupReplicaRequest) request);
+                    } else if (request instanceof 
ReadOnlySingleRowReplicaRequest) {
+                        return 
processReadOnlySingleEntryAction((ReadOnlySingleRowReplicaRequest) request);
+                    } else if (request instanceof 
ReadOnlyMultiRowReplicaRequest) {
+                        return 
processReadOnlyMultiEntryAction((ReadOnlyMultiRowReplicaRequest) request);
+                    } else if (request instanceof 
ReadOnlyScanRetrieveBatchReplicaRequest) {
+                        return 
processReadOnlyScanRetrieveBatchAction((ReadOnlyScanRetrieveBatchReplicaRequest)
 request);
                     } else {
                         throw new 
UnsupportedReplicaRequestException(request.getClass());
                     }
                 });
     }
 
+    /**
+     * Processes retrieve batch for read only transaction.
+     *
+     * @param request Read only retrieve batch request.
+     * @return Result future.
+     */
+    private CompletableFuture<Object> 
processReadOnlyScanRetrieveBatchAction(ReadOnlyScanRetrieveBatchReplicaRequest 
request) {
+        UUID txId = request.transactionId();
+        int batchCount = request.batchSize();
+
+        IgniteUuid cursorId = new IgniteUuid(txId, request.scanId());
+
+        PartitionTimestampCursor timestampCursor = mvDataStorage.scan(row -> 
true, request.timestamp());
+
+        Cursor<BinaryRow> cursor = cursors.computeIfAbsent(cursorId, id -> new 
BinaryRowCursorWrapper(timestampCursor));
+
+        ArrayList<BinaryRow> batchRows = new ArrayList<>(batchCount);
+
+        for (int i = 0; i < batchCount && cursor.hasNext(); i++) {
+            batchRows.add(cursor.next());
+        }
+
+        return CompletableFuture.completedFuture(batchRows);
+    }
+
+    /**
+     * Processes single entry request for read only transaction.
+     *
+     * @param request Read only single entry request.
+     * @return Result future.
+     */
+    private CompletableFuture<Object> 
processReadOnlySingleEntryAction(ReadOnlySingleRowReplicaRequest request) {
+        ByteBuffer searchKey = request.binaryRow().keySlice();
+
+        UUID indexId = indexIdOrDefault(indexScanId/*request.indexToUse()*/);
+
+        if (request.requestType() !=  RequestType.RO_GET) {
+            throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
+                    IgniteStringFormatter.format("Unknown single request 
[actionType={}]", request.requestType()));
+        }
+
+        //TODO: Use timestamp to find a row id.
+        RowId rowId = rowIdByKey(indexId, searchKey);

Review Comment:
   There may be multiple rowIds. 
   k1 -> rowId1
   tx begin
     remove k1
     insert k1->rowId2
   tx commit
   
   As a result there are two k1 entries in index.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -182,12 +189,112 @@ public CompletableFuture<Object> invoke(ReplicaRequest 
request) {
                         return processTxFinishAction((TxFinishReplicaRequest) 
request);
                     } else if (request instanceof TxCleanupReplicaRequest) {
                         return 
processTxCleanupAction((TxCleanupReplicaRequest) request);
+                    } else if (request instanceof 
ReadOnlySingleRowReplicaRequest) {
+                        return 
processReadOnlySingleEntryAction((ReadOnlySingleRowReplicaRequest) request);
+                    } else if (request instanceof 
ReadOnlyMultiRowReplicaRequest) {
+                        return 
processReadOnlyMultiEntryAction((ReadOnlyMultiRowReplicaRequest) request);
+                    } else if (request instanceof 
ReadOnlyScanRetrieveBatchReplicaRequest) {
+                        return 
processReadOnlyScanRetrieveBatchAction((ReadOnlyScanRetrieveBatchReplicaRequest)
 request);
                     } else {
                         throw new 
UnsupportedReplicaRequestException(request.getClass());
                     }
                 });
     }
 
+    /**
+     * Processes retrieve batch for read only transaction.
+     *
+     * @param request Read only retrieve batch request.
+     * @return Result future.
+     */
+    private CompletableFuture<Object> 
processReadOnlyScanRetrieveBatchAction(ReadOnlyScanRetrieveBatchReplicaRequest 
request) {
+        UUID txId = request.transactionId();
+        int batchCount = request.batchSize();
+
+        IgniteUuid cursorId = new IgniteUuid(txId, request.scanId());
+
+        PartitionTimestampCursor timestampCursor = mvDataStorage.scan(row -> 
true, request.timestamp());
+
+        Cursor<BinaryRow> cursor = cursors.computeIfAbsent(cursorId, id -> new 
BinaryRowCursorWrapper(timestampCursor));
+
+        ArrayList<BinaryRow> batchRows = new ArrayList<>(batchCount);
+
+        for (int i = 0; i < batchCount && cursor.hasNext(); i++) {
+            batchRows.add(cursor.next());
+        }
+
+        return CompletableFuture.completedFuture(batchRows);
+    }
+
+    /**
+     * Processes single entry request for read only transaction.
+     *
+     * @param request Read only single entry request.
+     * @return Result future.
+     */
+    private CompletableFuture<Object> 
processReadOnlySingleEntryAction(ReadOnlySingleRowReplicaRequest request) {
+        ByteBuffer searchKey = request.binaryRow().keySlice();
+
+        UUID indexId = indexIdOrDefault(indexScanId/*request.indexToUse()*/);
+
+        if (request.requestType() !=  RequestType.RO_GET) {
+            throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
+                    IgniteStringFormatter.format("Unknown single request 
[actionType={}]", request.requestType()));
+        }
+
+        //TODO: Use timestamp to find a row id.
+        RowId rowId = rowIdByKey(indexId, searchKey);
+
+        BinaryRow result = rowId != null ? 
resolveWriteIntent(mvDataStorage.read(rowId, request.timestamp())) : null;
+
+        return CompletableFuture.completedFuture(result);
+    }
+
+    /**
+     * Processes multiple entries request for read only transaction.
+     *
+     * @param request Read only multiple entries request.
+     * @return Result future.
+     */
+    private CompletableFuture<Object> 
processReadOnlyMultiEntryAction(ReadOnlyMultiRowReplicaRequest request) {
+        Collection<ByteBuffer> keyRows = request.binaryRows().stream().map(br 
-> br.keySlice()).collect(
+                Collectors.toList());
+
+        UUID indexId = indexIdOrDefault(indexScanId/*request.indexToUse()*/);
+
+        if (request.requestType() !=  RequestType.RO_GET_ALL) {
+            throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
+                    IgniteStringFormatter.format("Unknown single request 
[actionType={}]", request.requestType()));
+        }
+
+        ArrayList<BinaryRow> result = new ArrayList<>(keyRows.size());
+
+        for (ByteBuffer searchKey : keyRows) {
+            //TODO: Use timestamp to find a row id.
+            RowId rowId = rowIdByKey(indexId, searchKey);
+
+            result.add(rowId != null ? 
resolveWriteIntent(mvDataStorage.read(rowId, request.timestamp())) : null);
+        }
+
+        return CompletableFuture.completedFuture(result);
+    }
+
+    /**
+     * Resolves a read result to the matched row.

Review Comment:
   Please add more details of how writeIntent resolution works.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java:
##########
@@ -58,4 +58,19 @@ public class TableMessageGroup {
      * Message type for {@link 
org.apache.ignite.internal.table.distributed.message.HasDataResponse}.
      */
     public static final int HAS_DATA_RESPONSE = 6;
+
+    /**
+     * Message type for {@link 
org.apache.ignite.internal.table.distributed.replication.request.ReadOnlySingleRowReplicaRequest}.
+     */
+    public static final short RO_SINGLE_ROW_REPLICA_REQUEST = 7;

Review Comment:
   In order to be consistent with latest changes, let's use simple constant 
form (without public static) and short link form - {@link 
ReadOnlySingleRowReplicaRequest}. imports included.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -182,12 +189,112 @@ public CompletableFuture<Object> invoke(ReplicaRequest 
request) {
                         return processTxFinishAction((TxFinishReplicaRequest) 
request);
                     } else if (request instanceof TxCleanupReplicaRequest) {
                         return 
processTxCleanupAction((TxCleanupReplicaRequest) request);
+                    } else if (request instanceof 
ReadOnlySingleRowReplicaRequest) {
+                        return 
processReadOnlySingleEntryAction((ReadOnlySingleRowReplicaRequest) request);
+                    } else if (request instanceof 
ReadOnlyMultiRowReplicaRequest) {
+                        return 
processReadOnlyMultiEntryAction((ReadOnlyMultiRowReplicaRequest) request);
+                    } else if (request instanceof 
ReadOnlyScanRetrieveBatchReplicaRequest) {
+                        return 
processReadOnlyScanRetrieveBatchAction((ReadOnlyScanRetrieveBatchReplicaRequest)
 request);
                     } else {
                         throw new 
UnsupportedReplicaRequestException(request.getClass());
                     }
                 });
     }
 
+    /**
+     * Processes retrieve batch for read only transaction.
+     *
+     * @param request Read only retrieve batch request.
+     * @return Result future.
+     */
+    private CompletableFuture<Object> 
processReadOnlyScanRetrieveBatchAction(ReadOnlyScanRetrieveBatchReplicaRequest 
request) {
+        UUID txId = request.transactionId();
+        int batchCount = request.batchSize();
+
+        IgniteUuid cursorId = new IgniteUuid(txId, request.scanId());
+
+        PartitionTimestampCursor timestampCursor = mvDataStorage.scan(row -> 
true, request.timestamp());
+
+        Cursor<BinaryRow> cursor = cursors.computeIfAbsent(cursorId, id -> new 
BinaryRowCursorWrapper(timestampCursor));
+
+        ArrayList<BinaryRow> batchRows = new ArrayList<>(batchCount);
+
+        for (int i = 0; i < batchCount && cursor.hasNext(); i++) {
+            batchRows.add(cursor.next());
+        }
+
+        return CompletableFuture.completedFuture(batchRows);
+    }
+
+    /**
+     * Processes single entry request for read only transaction.
+     *
+     * @param request Read only single entry request.
+     * @return Result future.
+     */
+    private CompletableFuture<Object> 
processReadOnlySingleEntryAction(ReadOnlySingleRowReplicaRequest request) {
+        ByteBuffer searchKey = request.binaryRow().keySlice();
+
+        UUID indexId = indexIdOrDefault(indexScanId/*request.indexToUse()*/);

Review Comment:
   Please add todo https://issues.apache.org/jira/browse/IGNITE-17748



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -182,12 +189,112 @@ public CompletableFuture<Object> invoke(ReplicaRequest 
request) {
                         return processTxFinishAction((TxFinishReplicaRequest) 
request);
                     } else if (request instanceof TxCleanupReplicaRequest) {
                         return 
processTxCleanupAction((TxCleanupReplicaRequest) request);
+                    } else if (request instanceof 
ReadOnlySingleRowReplicaRequest) {
+                        return 
processReadOnlySingleEntryAction((ReadOnlySingleRowReplicaRequest) request);
+                    } else if (request instanceof 
ReadOnlyMultiRowReplicaRequest) {
+                        return 
processReadOnlyMultiEntryAction((ReadOnlyMultiRowReplicaRequest) request);
+                    } else if (request instanceof 
ReadOnlyScanRetrieveBatchReplicaRequest) {
+                        return 
processReadOnlyScanRetrieveBatchAction((ReadOnlyScanRetrieveBatchReplicaRequest)
 request);
                     } else {
                         throw new 
UnsupportedReplicaRequestException(request.getClass());
                     }
                 });
     }
 
+    /**
+     * Processes retrieve batch for read only transaction.
+     *
+     * @param request Read only retrieve batch request.
+     * @return Result future.
+     */
+    private CompletableFuture<Object> 
processReadOnlyScanRetrieveBatchAction(ReadOnlyScanRetrieveBatchReplicaRequest 
request) {
+        UUID txId = request.transactionId();
+        int batchCount = request.batchSize();
+
+        IgniteUuid cursorId = new IgniteUuid(txId, request.scanId());
+
+        PartitionTimestampCursor timestampCursor = mvDataStorage.scan(row -> 
true, request.timestamp());
+
+        Cursor<BinaryRow> cursor = cursors.computeIfAbsent(cursorId, id -> new 
BinaryRowCursorWrapper(timestampCursor));
+
+        ArrayList<BinaryRow> batchRows = new ArrayList<>(batchCount);
+
+        for (int i = 0; i < batchCount && cursor.hasNext(); i++) {
+            batchRows.add(cursor.next());
+        }
+
+        return CompletableFuture.completedFuture(batchRows);
+    }
+
+    /**
+     * Processes single entry request for read only transaction.
+     *
+     * @param request Read only single entry request.
+     * @return Result future.
+     */
+    private CompletableFuture<Object> 
processReadOnlySingleEntryAction(ReadOnlySingleRowReplicaRequest request) {
+        ByteBuffer searchKey = request.binaryRow().keySlice();
+
+        UUID indexId = indexIdOrDefault(indexScanId/*request.indexToUse()*/);
+
+        if (request.requestType() !=  RequestType.RO_GET) {
+            throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
+                    IgniteStringFormatter.format("Unknown single request 
[actionType={}]", request.requestType()));
+        }
+
+        //TODO: Use timestamp to find a row id.
+        RowId rowId = rowIdByKey(indexId, searchKey);
+
+        BinaryRow result = rowId != null ? 
resolveWriteIntent(mvDataStorage.read(rowId, request.timestamp())) : null;
+
+        return CompletableFuture.completedFuture(result);
+    }
+
+    /**
+     * Processes multiple entries request for read only transaction.
+     *
+     * @param request Read only multiple entries request.
+     * @return Result future.
+     */
+    private CompletableFuture<Object> 
processReadOnlyMultiEntryAction(ReadOnlyMultiRowReplicaRequest request) {

Review Comment:
   Same as for single key



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -182,12 +189,112 @@ public CompletableFuture<Object> invoke(ReplicaRequest 
request) {
                         return processTxFinishAction((TxFinishReplicaRequest) 
request);
                     } else if (request instanceof TxCleanupReplicaRequest) {
                         return 
processTxCleanupAction((TxCleanupReplicaRequest) request);
+                    } else if (request instanceof 
ReadOnlySingleRowReplicaRequest) {
+                        return 
processReadOnlySingleEntryAction((ReadOnlySingleRowReplicaRequest) request);
+                    } else if (request instanceof 
ReadOnlyMultiRowReplicaRequest) {
+                        return 
processReadOnlyMultiEntryAction((ReadOnlyMultiRowReplicaRequest) request);
+                    } else if (request instanceof 
ReadOnlyScanRetrieveBatchReplicaRequest) {
+                        return 
processReadOnlyScanRetrieveBatchAction((ReadOnlyScanRetrieveBatchReplicaRequest)
 request);
                     } else {
                         throw new 
UnsupportedReplicaRequestException(request.getClass());
                     }
                 });
     }
 
+    /**
+     * Processes retrieve batch for read only transaction.
+     *
+     * @param request Read only retrieve batch request.
+     * @return Result future.
+     */
+    private CompletableFuture<Object> 
processReadOnlyScanRetrieveBatchAction(ReadOnlyScanRetrieveBatchReplicaRequest 
request) {
+        UUID txId = request.transactionId();
+        int batchCount = request.batchSize();
+
+        IgniteUuid cursorId = new IgniteUuid(txId, request.scanId());
+
+        PartitionTimestampCursor timestampCursor = mvDataStorage.scan(row -> 
true, request.timestamp());
+
+        Cursor<BinaryRow> cursor = cursors.computeIfAbsent(cursorId, id -> new 
BinaryRowCursorWrapper(timestampCursor));
+
+        ArrayList<BinaryRow> batchRows = new ArrayList<>(batchCount);
+
+        for (int i = 0; i < batchCount && cursor.hasNext(); i++) {
+            batchRows.add(cursor.next());
+        }
+
+        return CompletableFuture.completedFuture(batchRows);
+    }
+
+    /**
+     * Processes single entry request for read only transaction.
+     *
+     * @param request Read only single entry request.
+     * @return Result future.
+     */
+    private CompletableFuture<Object> 
processReadOnlySingleEntryAction(ReadOnlySingleRowReplicaRequest request) {
+        ByteBuffer searchKey = request.binaryRow().keySlice();
+
+        UUID indexId = indexIdOrDefault(indexScanId/*request.indexToUse()*/);
+
+        if (request.requestType() !=  RequestType.RO_GET) {
+            throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
+                    IgniteStringFormatter.format("Unknown single request 
[actionType={}]", request.requestType()));
+        }
+
+        //TODO: Use timestamp to find a row id.

Review Comment:
   What do you mean?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -182,12 +189,112 @@ public CompletableFuture<Object> invoke(ReplicaRequest 
request) {
                         return processTxFinishAction((TxFinishReplicaRequest) 
request);
                     } else if (request instanceof TxCleanupReplicaRequest) {
                         return 
processTxCleanupAction((TxCleanupReplicaRequest) request);
+                    } else if (request instanceof 
ReadOnlySingleRowReplicaRequest) {
+                        return 
processReadOnlySingleEntryAction((ReadOnlySingleRowReplicaRequest) request);
+                    } else if (request instanceof 
ReadOnlyMultiRowReplicaRequest) {
+                        return 
processReadOnlyMultiEntryAction((ReadOnlyMultiRowReplicaRequest) request);
+                    } else if (request instanceof 
ReadOnlyScanRetrieveBatchReplicaRequest) {
+                        return 
processReadOnlyScanRetrieveBatchAction((ReadOnlyScanRetrieveBatchReplicaRequest)
 request);
                     } else {
                         throw new 
UnsupportedReplicaRequestException(request.getClass());
                     }
                 });
     }
 
+    /**
+     * Processes retrieve batch for read only transaction.
+     *
+     * @param request Read only retrieve batch request.
+     * @return Result future.
+     */
+    private CompletableFuture<Object> 
processReadOnlyScanRetrieveBatchAction(ReadOnlyScanRetrieveBatchReplicaRequest 
request) {
+        UUID txId = request.transactionId();
+        int batchCount = request.batchSize();
+
+        IgniteUuid cursorId = new IgniteUuid(txId, request.scanId());
+
+        PartitionTimestampCursor timestampCursor = mvDataStorage.scan(row -> 
true, request.timestamp());
+
+        Cursor<BinaryRow> cursor = cursors.computeIfAbsent(cursorId, id -> new 
BinaryRowCursorWrapper(timestampCursor));
+
+        ArrayList<BinaryRow> batchRows = new ArrayList<>(batchCount);
+
+        for (int i = 0; i < batchCount && cursor.hasNext(); i++) {
+            batchRows.add(cursor.next());
+        }
+
+        return CompletableFuture.completedFuture(batchRows);
+    }
+
+    /**
+     * Processes single entry request for read only transaction.
+     *
+     * @param request Read only single entry request.
+     * @return Result future.
+     */
+    private CompletableFuture<Object> 
processReadOnlySingleEntryAction(ReadOnlySingleRowReplicaRequest request) {
+        ByteBuffer searchKey = request.binaryRow().keySlice();
+
+        UUID indexId = indexIdOrDefault(indexScanId/*request.indexToUse()*/);
+
+        if (request.requestType() !=  RequestType.RO_GET) {
+            throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
+                    IgniteStringFormatter.format("Unknown single request 
[actionType={}]", request.requestType()));
+        }
+
+        //TODO: Use timestamp to find a row id.
+        RowId rowId = rowIdByKey(indexId, searchKey);
+
+        BinaryRow result = rowId != null ? 
resolveWriteIntent(mvDataStorage.read(rowId, request.timestamp())) : null;
+
+        return CompletableFuture.completedFuture(result);
+    }
+
+    /**
+     * Processes multiple entries request for read only transaction.
+     *
+     * @param request Read only multiple entries request.
+     * @return Result future.
+     */
+    private CompletableFuture<Object> 
processReadOnlyMultiEntryAction(ReadOnlyMultiRowReplicaRequest request) {
+        Collection<ByteBuffer> keyRows = request.binaryRows().stream().map(br 
-> br.keySlice()).collect(
+                Collectors.toList());
+
+        UUID indexId = indexIdOrDefault(indexScanId/*request.indexToUse()*/);
+
+        if (request.requestType() !=  RequestType.RO_GET_ALL) {
+            throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
+                    IgniteStringFormatter.format("Unknown single request 
[actionType={}]", request.requestType()));
+        }
+
+        ArrayList<BinaryRow> result = new ArrayList<>(keyRows.size());
+
+        for (ByteBuffer searchKey : keyRows) {
+            //TODO: Use timestamp to find a row id.
+            RowId rowId = rowIdByKey(indexId, searchKey);
+
+            result.add(rowId != null ? 
resolveWriteIntent(mvDataStorage.read(rowId, request.timestamp())) : null);
+        }
+
+        return CompletableFuture.completedFuture(result);
+    }
+
+    /**
+     * Resolves a read result to the matched row.
+     * If the result dos not math any row, the method return {@code null}.
+     * TODO: Use a write intent resolution procedure to get a row form {@link 
ReadResult}.
+     *
+     * @param readResult Read result.
+     * @return {@link BinaryRow} or {@code null}.
+     */
+    private BinaryRow resolveWriteIntent(ReadResult readResult) {
+        if (readResult == null) {
+            return null;
+        }
+
+        return readResult.binaryRow();

Review Comment:
   I'd rather check if binaryRow is null and throw an IllegalStateException or 
similar until writeIntent resolution is implemented.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to