This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 8400cc6474b IGNITE-28349 Move all remaining handlers to the request
handler registry in `PartitionReplicaListener` (#7867)
8400cc6474b is described below
commit 8400cc6474ba1f2a5c06342591a48ba292d0bdef
Author: Ivan Zlenko <[email protected]>
AuthorDate: Wed Mar 25 12:46:10 2026 +0500
IGNITE-28349 Move all remaining handlers to the request handler registry in
`PartitionReplicaListener` (#7867)
---
.../replicator/PartitionReplicaListener.java | 195 ++++++++++++---------
1 file changed, 114 insertions(+), 81 deletions(-)
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index c45b63708ab..b2e29dfff44 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -477,6 +477,42 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
(ReplicaRequestHandler<ReadOnlyScanRetrieveBatchReplicaRequest>) (req, primacy)
->
processReadOnlyScanRetrieveBatchAction(req,
primacy.isPrimary()));
+ handlersBuilder.addHandler(
+ PartitionReplicationMessageGroup.GROUP_TYPE,
+ PartitionReplicationMessageGroup.RW_SINGLE_ROW_REPLICA_REQUEST,
+ (ReplicaRequestHandler<ReadWriteSingleRowReplicaRequest>)
(req, primacy) ->
+ processRwSingleRowRequest(req, primacy));
+
+ handlersBuilder.addHandler(
+ PartitionReplicationMessageGroup.GROUP_TYPE,
+
PartitionReplicationMessageGroup.RW_SINGLE_ROW_PK_REPLICA_REQUEST,
+ (ReplicaRequestHandler<ReadWriteSingleRowPkReplicaRequest>)
(req, primacy) ->
+ processRwSingleRowPkRequest(req, primacy));
+
+ handlersBuilder.addHandler(
+ PartitionReplicationMessageGroup.GROUP_TYPE,
+ PartitionReplicationMessageGroup.RW_MULTI_ROW_REPLICA_REQUEST,
+ (ReplicaRequestHandler<ReadWriteMultiRowReplicaRequest>) (req,
primacy) ->
+ processRwMultiRowRequest(req, primacy));
+
+ handlersBuilder.addHandler(
+ PartitionReplicationMessageGroup.GROUP_TYPE,
+
PartitionReplicationMessageGroup.RW_MULTI_ROW_PK_REPLICA_REQUEST,
+ (ReplicaRequestHandler<ReadWriteMultiRowPkReplicaRequest>)
(req, primacy) ->
+ processRwMultiRowPkRequest(req, primacy));
+
+ handlersBuilder.addHandler(
+ PartitionReplicationMessageGroup.GROUP_TYPE,
+ PartitionReplicationMessageGroup.RW_DUAL_ROW_REPLICA_REQUEST,
+ (ReplicaRequestHandler<ReadWriteSwapRowReplicaRequest>) (req,
primacy) ->
+ processRwSwapRowRequest(req, primacy));
+
+ handlersBuilder.addHandler(
+ PartitionReplicationMessageGroup.GROUP_TYPE,
+
PartitionReplicationMessageGroup.RW_SCAN_RETRIEVE_BATCH_REPLICA_REQUEST,
+
(ReplicaRequestHandler<ReadWriteScanRetrieveBatchReplicaRequest>) (req,
primacy) ->
+ processRwScanRetrieveBatchRequest(req));
+
requestHandlers = handlersBuilder.build();
}
@@ -575,89 +611,86 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
return roHandler.handle(request, opStartTsIfDirectRo);
}
- if (request instanceof ReadWriteSingleRowReplicaRequest) {
- var req = (ReadWriteSingleRowReplicaRequest) request;
-
- return appendTxCommand(
- req.transactionId(),
- req.requestType(),
- req.full(),
- () -> processSingleEntryAction(req,
replicaPrimacy.leaseStartTime()).whenComplete(
- (r, e) -> setDelayedAckProcessor(r,
req.delayedAckProcessor()))
- );
- } else if (request instanceof ReadWriteSingleRowPkReplicaRequest) {
- var req = (ReadWriteSingleRowPkReplicaRequest) request;
-
- return appendTxCommand(
- req.transactionId(),
- req.requestType(),
- req.full(),
- () -> processSingleEntryAction(req,
replicaPrimacy.leaseStartTime()).whenComplete(
- (r, e) -> setDelayedAckProcessor(r,
req.delayedAckProcessor()))
- );
- } else if (request instanceof ReadWriteMultiRowReplicaRequest) {
- var req = (ReadWriteMultiRowReplicaRequest) request;
-
- return appendTxCommand(
- req.transactionId(),
- req.requestType(),
- req.full(),
- () -> processMultiEntryAction(req,
replicaPrimacy.leaseStartTime()).whenComplete(
- (r, e) -> setDelayedAckProcessor(r,
req.delayedAckProcessor()))
- );
- } else if (request instanceof ReadWriteMultiRowPkReplicaRequest) {
- var req = (ReadWriteMultiRowPkReplicaRequest) request;
-
- return appendTxCommand(
- req.transactionId(),
- req.requestType(),
- req.full(),
- () -> processMultiEntryAction(req,
replicaPrimacy.leaseStartTime()).whenComplete(
- (r, e) -> setDelayedAckProcessor(r,
req.delayedAckProcessor()))
- );
- } else if (request instanceof ReadWriteSwapRowReplicaRequest) {
- var req = (ReadWriteSwapRowReplicaRequest) request;
-
- return appendTxCommand(
- req.transactionId(),
- req.requestType(),
- req.full(),
- () -> processTwoEntriesAction(req,
replicaPrimacy.leaseStartTime()).whenComplete(
- (r, e) -> setDelayedAckProcessor(r,
req.delayedAckProcessor()))
- );
- } else if (request instanceof
ReadWriteScanRetrieveBatchReplicaRequest) {
- var req = (ReadWriteScanRetrieveBatchReplicaRequest) request;
-
- // Scan's request.full() has a slightly different semantics than
the same field in other requests -
- // it identifies an implicit transaction. Please note that
request.full() is always false in the following `appendTxCommand`.
- // We treat SCAN as 2pc and only switch to a 1pc mode if all table
rows fit in the bucket and the transaction is implicit.
- // See `req.full() && (err != null || rows.size() <
req.batchSize())` condition.
- // If they don't fit the bucket, the transaction is treated as 2pc.
- replicaTouch(req.transactionId(), req.coordinatorId(),
req.commitPartitionId().asZonePartitionId(), req.txLabel());
-
- // Implicit RW scan can be committed locally on a last batch or
error.
- return appendTxCommand(req.transactionId(), RW_SCAN, false, () ->
processScanRetrieveBatchAction(req))
- .thenCompose(rows -> {
- if (allElementsAreNull(rows)) {
- return completedFuture(rows);
- } else {
- return
validateRwReadAgainstSchemaAfterTakingLocks(req.transactionId())
- .thenApply(ignored -> {
- metrics.onRead(rows.size(), false,
true);
+ throw new UnsupportedReplicaRequestException(request.getClass());
+ }
- return rows;
- });
- }
- })
- .whenComplete((rows, err) -> {
- if (req.full() && (err != null || rows.size() <
req.batchSize())) {
- releaseTxLocks(req.transactionId());
- }
- });
- }
+ private CompletableFuture<?>
processRwSingleRowRequest(ReadWriteSingleRowReplicaRequest req, ReplicaPrimacy
primacy) {
+ return appendTxCommand(
+ req.transactionId(),
+ req.requestType(),
+ req.full(),
+ () -> processSingleEntryAction(req,
primacy.leaseStartTime()).whenComplete(
+ (r, e) -> setDelayedAckProcessor(r,
req.delayedAckProcessor()))
+ );
+ }
- // Unknown request.
- throw new UnsupportedReplicaRequestException(request.getClass());
+ private CompletableFuture<?>
processRwSingleRowPkRequest(ReadWriteSingleRowPkReplicaRequest req,
ReplicaPrimacy primacy) {
+ return appendTxCommand(
+ req.transactionId(),
+ req.requestType(),
+ req.full(),
+ () -> processSingleEntryAction(req,
primacy.leaseStartTime()).whenComplete(
+ (r, e) -> setDelayedAckProcessor(r,
req.delayedAckProcessor()))
+ );
+ }
+
+ private CompletableFuture<?>
processRwMultiRowRequest(ReadWriteMultiRowReplicaRequest req, ReplicaPrimacy
primacy) {
+ return appendTxCommand(
+ req.transactionId(),
+ req.requestType(),
+ req.full(),
+ () -> processMultiEntryAction(req,
primacy.leaseStartTime()).whenComplete(
+ (r, e) -> setDelayedAckProcessor(r,
req.delayedAckProcessor()))
+ );
+ }
+
+ private CompletableFuture<?>
processRwMultiRowPkRequest(ReadWriteMultiRowPkReplicaRequest req,
ReplicaPrimacy primacy) {
+ return appendTxCommand(
+ req.transactionId(),
+ req.requestType(),
+ req.full(),
+ () -> processMultiEntryAction(req,
primacy.leaseStartTime()).whenComplete(
+ (r, e) -> setDelayedAckProcessor(r,
req.delayedAckProcessor()))
+ );
+ }
+
+ private CompletableFuture<?>
processRwSwapRowRequest(ReadWriteSwapRowReplicaRequest req, ReplicaPrimacy
primacy) {
+ return appendTxCommand(
+ req.transactionId(),
+ req.requestType(),
+ req.full(),
+ () -> processTwoEntriesAction(req,
primacy.leaseStartTime()).whenComplete(
+ (r, e) -> setDelayedAckProcessor(r,
req.delayedAckProcessor()))
+ );
+ }
+
+ private CompletableFuture<?>
processRwScanRetrieveBatchRequest(ReadWriteScanRetrieveBatchReplicaRequest req)
{
+ // Scan's request.full() has a slightly different semantics than the
same field in other requests -
+ // it identifies an implicit transaction. Please note that
request.full() is always false in the following `appendTxCommand`.
+ // We treat SCAN as 2pc and only switch to a 1pc mode if all table
rows fit in the bucket and the transaction is implicit.
+ // See `req.full() && (err != null || rows.size() < req.batchSize())`
condition.
+ // If they don't fit the bucket, the transaction is treated as 2pc.
+ replicaTouch(req.transactionId(), req.coordinatorId(),
req.commitPartitionId().asZonePartitionId(), req.txLabel());
+
+ // Implicit RW scan can be committed locally on a last batch or error.
+ return appendTxCommand(req.transactionId(), RW_SCAN, false, () ->
processScanRetrieveBatchAction(req))
+ .thenCompose(rows -> {
+ if (allElementsAreNull(rows)) {
+ return completedFuture(rows);
+ } else {
+ return
validateRwReadAgainstSchemaAfterTakingLocks(req.transactionId())
+ .thenApply(ignored -> {
+ metrics.onRead(rows.size(), false, true);
+
+ return rows;
+ });
+ }
+ })
+ .whenComplete((rows, err) -> {
+ if (req.full() && (err != null || rows.size() <
req.batchSize())) {
+ releaseTxLocks(req.transactionId());
+ }
+ });
}
/**