This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 6d97e9de7e9 IGNITE-28537 Remove duplicates from
PartitionReplicaListener (#7985)
6d97e9de7e9 is described below
commit 6d97e9de7e9639586833322a497babdc5c95dec4
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Tue Apr 14 12:59:14 2026 +0400
IGNITE-28537 Remove duplicates from PartitionReplicaListener (#7985)
---
.../replicator/PartitionReplicaListener.java | 122 ++++++++-------------
1 file changed, 46 insertions(+), 76 deletions(-)
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 0f816b845b0..f173d9ad228 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -122,6 +122,7 @@ import
org.apache.ignite.internal.partition.replicator.network.command.TimedBina
import
org.apache.ignite.internal.partition.replicator.network.command.TimedBinaryRowMessageBuilder;
import
org.apache.ignite.internal.partition.replicator.network.command.UpdateAllCommand;
import
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommand;
+import
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommandBase;
import
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommandV2Builder;
import
org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowMessage;
import
org.apache.ignite.internal.partition.replicator.network.replication.BinaryTupleMessage;
@@ -2622,48 +2623,22 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
);
if (!cmd.full()) {
- if (skipDelayedAck) {
- if (!SKIP_UPDATES) {
- storageUpdateHandler.handleUpdate(
- cmd.txId(),
- cmd.rowUuid(),
- cmd.commitPartitionId().asReplicationGroupId(),
- cmd.rowToUpdate(),
- true,
- null,
- null,
- null,
- indexIdsAtRwTxBeginTs(txId)
- );
- }
-
- return applyCmdWithExceptionHandling(cmd).thenApply(res ->
null);
- } else {
- if (!SKIP_UPDATES) {
- // We don't need to take the partition snapshots read
lock, see #INTERNAL_DOC_PLACEHOLDER why.
- storageUpdateHandler.handleUpdate(
- cmd.txId(),
- cmd.rowUuid(),
- cmd.commitPartitionId().asReplicationGroupId(),
- cmd.rowToUpdate(),
- true,
- null,
- null,
- null,
- indexIdsAtRwTxBeginTs(txId)
- );
- }
-
- CompletableFuture<UUID> repFut =
applyCmdWithExceptionHandling(cmd).handle((r, e) -> {
- if (e != null) {
- throw new DelayedAckException(cmd.txId(),
unwrapCause(e), txManager);
- }
-
- return cmd.txId();
- });
-
- return completedFuture(new CommandApplicationResult(null,
repFut));
+ if (!SKIP_UPDATES) {
+ // We don't need to take the partition snapshots read lock,
see #INTERNAL_DOC_PLACEHOLDER why.
+ storageUpdateHandler.handleUpdate(
+ cmd.txId(),
+ cmd.rowUuid(),
+ cmd.commitPartitionId().asReplicationGroupId(),
+ cmd.rowToUpdate(),
+ true,
+ null,
+ null,
+ null,
+ indexIdsAtRwTxBeginTs(txId)
+ );
}
+
+ return applyCmdRespectingDelayedAck(cmd, skipDelayedAck);
} else {
return applyCmdWithExceptionHandling(cmd).thenCompose(res -> {
UpdateCommandResult updateCommandResult =
(UpdateCommandResult) res;
@@ -2737,6 +2712,24 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
);
}
+ private CompletableFuture<CommandApplicationResult>
applyCmdRespectingDelayedAck(UpdateCommandBase cmd, boolean skipDelayedAck) {
+ assert !cmd.full() : "Only non-full commands are supported here";
+
+ if (skipDelayedAck) {
+ return applyCmdWithExceptionHandling(cmd).thenApply(res -> null);
+ } else {
+ CompletableFuture<UUID> repFut =
applyCmdWithExceptionHandling(cmd).handle((r, e) -> {
+ if (e != null) {
+ throw new DelayedAckException(cmd.txId(), unwrapCause(e),
txManager);
+ }
+
+ return cmd.txId();
+ });
+
+ return completedFuture(new CommandApplicationResult(null, repFut));
+ }
+ }
+
/**
* Executes an UpdateAll command.
*
@@ -2771,41 +2764,18 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
);
if (!cmd.full()) {
- if (skipDelayedAck) {
- // We don't need to take the partition snapshots read lock,
see #INTERNAL_DOC_PLACEHOLDER why.
- storageUpdateHandler.handleUpdateAll(
- cmd.txId(),
- cmd.rowsToUpdate(),
- cmd.commitPartitionId().asReplicationGroupId(),
- true,
- null,
- null,
- indexIdsAtRwTxBeginTs(txId)
- );
-
- return applyCmdWithExceptionHandling(cmd).thenApply(res ->
null);
- } else {
- // We don't need to take the partition snapshots read lock,
see #INTERNAL_DOC_PLACEHOLDER why.
- storageUpdateHandler.handleUpdateAll(
- cmd.txId(),
- cmd.rowsToUpdate(),
- cmd.commitPartitionId().asReplicationGroupId(),
- true,
- null,
- null,
- indexIdsAtRwTxBeginTs(txId)
- );
- }
-
- CompletableFuture<UUID> repFut =
applyCmdWithExceptionHandling(cmd).handle((r, e) -> {
- if (e != null) {
- throw new DelayedAckException(cmd.txId(), unwrapCause(e),
txManager);
- }
-
- return cmd.txId();
- });
-
- return completedFuture(new CommandApplicationResult(null, repFut));
+ // We don't need to take the partition snapshots read lock, see
#INTERNAL_DOC_PLACEHOLDER why.
+ storageUpdateHandler.handleUpdateAll(
+ cmd.txId(),
+ cmd.rowsToUpdate(),
+ cmd.commitPartitionId().asReplicationGroupId(),
+ true,
+ null,
+ null,
+ indexIdsAtRwTxBeginTs(txId)
+ );
+
+ return applyCmdRespectingDelayedAck(cmd, skipDelayedAck);
} else {
return applyCmdWithExceptionHandling(cmd).thenCompose(res -> {
UpdateCommandResult updateCommandResult =
(UpdateCommandResult) res;