sanpwc commented on code in PR #1165:
URL: https://github.com/apache/ignite-3/pull/1165#discussion_r990193322
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -182,12 +189,112 @@ public CompletableFuture<Object> invoke(ReplicaRequest
request) {
return processTxFinishAction((TxFinishReplicaRequest)
request);
} else if (request instanceof TxCleanupReplicaRequest) {
return
processTxCleanupAction((TxCleanupReplicaRequest) request);
+ } else if (request instanceof
ReadOnlySingleRowReplicaRequest) {
+ return
processReadOnlySingleEntryAction((ReadOnlySingleRowReplicaRequest) request);
+ } else if (request instanceof
ReadOnlyMultiRowReplicaRequest) {
+ return
processReadOnlyMultiEntryAction((ReadOnlyMultiRowReplicaRequest) request);
+ } else if (request instanceof
ReadOnlyScanRetrieveBatchReplicaRequest) {
+ return
processReadOnlyScanRetrieveBatchAction((ReadOnlyScanRetrieveBatchReplicaRequest)
request);
} else {
throw new
UnsupportedReplicaRequestException(request.getClass());
}
});
}
+ /**
+ * Processes retrieve batch for read only transaction.
+ *
+ * @param request Read only retrieve batch request.
+ * @return Result future.
+ */
+ private CompletableFuture<Object>
processReadOnlyScanRetrieveBatchAction(ReadOnlyScanRetrieveBatchReplicaRequest
request) {
+ UUID txId = request.transactionId();
+ int batchCount = request.batchSize();
+
+ IgniteUuid cursorId = new IgniteUuid(txId, request.scanId());
+
+ PartitionTimestampCursor timestampCursor = mvDataStorage.scan(row ->
true, request.timestamp());
+
+ Cursor<BinaryRow> cursor = cursors.computeIfAbsent(cursorId, id -> new
BinaryRowCursorWrapper(timestampCursor));
+
+ ArrayList<BinaryRow> batchRows = new ArrayList<>(batchCount);
+
+ for (int i = 0; i < batchCount && cursor.hasNext(); i++) {
+ batchRows.add(cursor.next());
+ }
+
+ return CompletableFuture.completedFuture(batchRows);
+ }
+
+ /**
+ * Processes single entry request for read only transaction.
+ *
+ * @param request Read only single entry request.
+ * @return Result future.
+ */
+ private CompletableFuture<Object>
processReadOnlySingleEntryAction(ReadOnlySingleRowReplicaRequest request) {
+ ByteBuffer searchKey = request.binaryRow().keySlice();
+
+ UUID indexId = indexIdOrDefault(indexScanId/*request.indexToUse()*/);
+
+ if (request.requestType() != RequestType.RO_GET) {
+ throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
+ IgniteStringFormatter.format("Unknown single request
[actionType={}]", request.requestType()));
+ }
+
+ //TODO: Use timestamp to find a row id.
+ RowId rowId = rowIdByKey(indexId, searchKey);
+
+ BinaryRow result = rowId != null ?
resolveWriteIntent(mvDataStorage.read(rowId, request.timestamp())) : null;
+
+ return CompletableFuture.completedFuture(result);
+ }
+
+ /**
+ * Processes multiple entries request for read only transaction.
+ *
+ * @param request Read only multiple entries request.
+ * @return Result future.
+ */
+ private CompletableFuture<Object>
processReadOnlyMultiEntryAction(ReadOnlyMultiRowReplicaRequest request) {
+ Collection<ByteBuffer> keyRows = request.binaryRows().stream().map(br
-> br.keySlice()).collect(
+ Collectors.toList());
+
+ UUID indexId = indexIdOrDefault(indexScanId/*request.indexToUse()*/);
+
+ if (request.requestType() != RequestType.RO_GET_ALL) {
+ throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
+ IgniteStringFormatter.format("Unknown single request
[actionType={}]", request.requestType()));
+ }
+
+ ArrayList<BinaryRow> result = new ArrayList<>(keyRows.size());
+
+ for (ByteBuffer searchKey : keyRows) {
+ //TODO: Use timestamp to find a row id.
+ RowId rowId = rowIdByKey(indexId, searchKey);
+
+ result.add(rowId != null ?
resolveWriteIntent(mvDataStorage.read(rowId, request.timestamp())) : null);
+ }
+
+ return CompletableFuture.completedFuture(result);
+ }
+
+ /**
+ * Resolves a read result to the matched row.
+ * If the result dos not math any row, the method return {@code null}.
+ * TODO: Use a write intent resolution procedure to get a row form {@link
ReadResult}.
+ *
+ * @param readResult Read result.
+ * @return {@link BinaryRow} or {@code null}.
+ */
+ private BinaryRow resolveWriteIntent(ReadResult readResult) {
+ if (readResult == null) {
+ return null;
+ }
+
+ return readResult.binaryRow();
Review Comment:
I'd rather check if binaryRow is null and throw an IllegalStateException or
similar until writeIntent resolution is implemented.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]