This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 6d97e9de7e9 IGNITE-28537 Remove duplicates from 
PartitionReplicaListener (#7985)
6d97e9de7e9 is described below

commit 6d97e9de7e9639586833322a497babdc5c95dec4
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Tue Apr 14 12:59:14 2026 +0400

    IGNITE-28537 Remove duplicates from PartitionReplicaListener (#7985)
---
 .../replicator/PartitionReplicaListener.java       | 122 ++++++++-------------
 1 file changed, 46 insertions(+), 76 deletions(-)

diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 0f816b845b0..f173d9ad228 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -122,6 +122,7 @@ import 
org.apache.ignite.internal.partition.replicator.network.command.TimedBina
 import 
org.apache.ignite.internal.partition.replicator.network.command.TimedBinaryRowMessageBuilder;
 import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateAllCommand;
 import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommand;
+import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommandBase;
 import 
org.apache.ignite.internal.partition.replicator.network.command.UpdateCommandV2Builder;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.BinaryRowMessage;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.BinaryTupleMessage;
@@ -2622,48 +2623,22 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
         );
 
         if (!cmd.full()) {
-            if (skipDelayedAck) {
-                if (!SKIP_UPDATES) {
-                    storageUpdateHandler.handleUpdate(
-                            cmd.txId(),
-                            cmd.rowUuid(),
-                            cmd.commitPartitionId().asReplicationGroupId(),
-                            cmd.rowToUpdate(),
-                            true,
-                            null,
-                            null,
-                            null,
-                            indexIdsAtRwTxBeginTs(txId)
-                    );
-                }
-
-                return applyCmdWithExceptionHandling(cmd).thenApply(res -> 
null);
-            } else {
-                if (!SKIP_UPDATES) {
-                    // We don't need to take the partition snapshots read 
lock, see #INTERNAL_DOC_PLACEHOLDER why.
-                    storageUpdateHandler.handleUpdate(
-                            cmd.txId(),
-                            cmd.rowUuid(),
-                            cmd.commitPartitionId().asReplicationGroupId(),
-                            cmd.rowToUpdate(),
-                            true,
-                            null,
-                            null,
-                            null,
-                            indexIdsAtRwTxBeginTs(txId)
-                    );
-                }
-
-                CompletableFuture<UUID> repFut = 
applyCmdWithExceptionHandling(cmd).handle((r, e) -> {
-                    if (e != null) {
-                        throw new DelayedAckException(cmd.txId(), 
unwrapCause(e), txManager);
-                    }
-
-                    return cmd.txId();
-                });
-
-                return completedFuture(new CommandApplicationResult(null, 
repFut));
+            if (!SKIP_UPDATES) {
+                // We don't need to take the partition snapshots read lock, 
see #INTERNAL_DOC_PLACEHOLDER why.
+                storageUpdateHandler.handleUpdate(
+                        cmd.txId(),
+                        cmd.rowUuid(),
+                        cmd.commitPartitionId().asReplicationGroupId(),
+                        cmd.rowToUpdate(),
+                        true,
+                        null,
+                        null,
+                        null,
+                        indexIdsAtRwTxBeginTs(txId)
+                );
             }
+
+            return applyCmdRespectingDelayedAck(cmd, skipDelayedAck);
         } else {
             return applyCmdWithExceptionHandling(cmd).thenCompose(res -> {
                 UpdateCommandResult updateCommandResult = 
(UpdateCommandResult) res;
@@ -2737,6 +2712,24 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
         );
     }
 
+    private CompletableFuture<CommandApplicationResult> 
applyCmdRespectingDelayedAck(UpdateCommandBase cmd, boolean skipDelayedAck) {
+        assert !cmd.full() : "Only non-full commands are supported here";
+
+        if (skipDelayedAck) {
+            return applyCmdWithExceptionHandling(cmd).thenApply(res -> null);
+        } else {
+            CompletableFuture<UUID> repFut = 
applyCmdWithExceptionHandling(cmd).handle((r, e) -> {
+                if (e != null) {
+                    throw new DelayedAckException(cmd.txId(), unwrapCause(e), 
txManager);
+                }
+
+                return cmd.txId();
+            });
+
+            return completedFuture(new CommandApplicationResult(null, repFut));
+        }
+    }
+
     /**
      * Executes an UpdateAll command.
      *
@@ -2771,41 +2764,18 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
         );
 
         if (!cmd.full()) {
-            if (skipDelayedAck) {
-                // We don't need to take the partition snapshots read lock, 
see #INTERNAL_DOC_PLACEHOLDER why.
-                storageUpdateHandler.handleUpdateAll(
-                        cmd.txId(),
-                        cmd.rowsToUpdate(),
-                        cmd.commitPartitionId().asReplicationGroupId(),
-                        true,
-                        null,
-                        null,
-                        indexIdsAtRwTxBeginTs(txId)
-                );
-
-                return applyCmdWithExceptionHandling(cmd).thenApply(res -> 
null);
-            } else {
-                // We don't need to take the partition snapshots read lock, 
see #INTERNAL_DOC_PLACEHOLDER why.
-                storageUpdateHandler.handleUpdateAll(
-                        cmd.txId(),
-                        cmd.rowsToUpdate(),
-                        cmd.commitPartitionId().asReplicationGroupId(),
-                        true,
-                        null,
-                        null,
-                        indexIdsAtRwTxBeginTs(txId)
-                );
-            }
-
-            CompletableFuture<UUID> repFut = 
applyCmdWithExceptionHandling(cmd).handle((r, e) -> {
-                if (e != null) {
-                    throw new DelayedAckException(cmd.txId(), unwrapCause(e), 
txManager);
-                }
-
-                return cmd.txId();
-            });
-
-            return completedFuture(new CommandApplicationResult(null, repFut));
+            // We don't need to take the partition snapshots read lock, see 
#INTERNAL_DOC_PLACEHOLDER why.
+            storageUpdateHandler.handleUpdateAll(
+                    cmd.txId(),
+                    cmd.rowsToUpdate(),
+                    cmd.commitPartitionId().asReplicationGroupId(),
+                    true,
+                    null,
+                    null,
+                    indexIdsAtRwTxBeginTs(txId)
+            );
+
+            return applyCmdRespectingDelayedAck(cmd, skipDelayedAck);
         } else {
             return applyCmdWithExceptionHandling(cmd).thenCompose(res -> {
                 UpdateCommandResult updateCommandResult = 
(UpdateCommandResult) res;

Reply via email to