This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new f6beed5d580 IGNITE-27601 TxCleanupReadyFutureList is modified to 
consume less memory (#7454)
f6beed5d580 is described below

commit f6beed5d5808705cb3faa05d16af46442bc0e897
Author: Denis Chudov <[email protected]>
AuthorDate: Thu Jan 22 14:33:09 2026 +0400

    IGNITE-27601 TxCleanupReadyFutureList is modified to consume less memory 
(#7454)
---
 .../replicator/PartitionReplicaListener.java       | 69 ++++++++++++----------
 1 file changed, 39 insertions(+), 30 deletions(-)

diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 0129f60fafc..9bd4da24b5f 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -63,7 +63,6 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.Collection;
-import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -502,12 +501,11 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
         if (request instanceof ReadWriteSingleRowReplicaRequest) {
             var req = (ReadWriteSingleRowReplicaRequest) request;
 
-            var opId = new OperationId(senderId, req.timestamp().longValue());
+            var opId = new OperationId(senderId, req.timestamp().longValue(), 
req.requestType());
 
             return appendTxCommand(
                     req.transactionId(),
                     opId,
-                    req.requestType(),
                     req.full(),
                     () -> processSingleEntryAction(req, 
replicaPrimacy.leaseStartTime()).whenComplete(
                             (r, e) -> setDelayedAckProcessor(r, 
req.delayedAckProcessor()))
@@ -515,12 +513,11 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
         } else if (request instanceof ReadWriteSingleRowPkReplicaRequest) {
             var req = (ReadWriteSingleRowPkReplicaRequest) request;
 
-            var opId = new OperationId(senderId, req.timestamp().longValue());
+            var opId = new OperationId(senderId, req.timestamp().longValue(), 
req.requestType());
 
             return appendTxCommand(
                     req.transactionId(),
                     opId,
-                    req.requestType(),
                     req.full(),
                     () -> processSingleEntryAction(req, 
replicaPrimacy.leaseStartTime()).whenComplete(
                             (r, e) -> setDelayedAckProcessor(r, 
req.delayedAckProcessor()))
@@ -528,12 +525,11 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
         } else if (request instanceof ReadWriteMultiRowReplicaRequest) {
             var req = (ReadWriteMultiRowReplicaRequest) request;
 
-            var opId = new OperationId(senderId, req.timestamp().longValue());
+            var opId = new OperationId(senderId, req.timestamp().longValue(), 
req.requestType());
 
             return appendTxCommand(
                     req.transactionId(),
                     opId,
-                    req.requestType(),
                     req.full(),
                     () -> processMultiEntryAction(req, 
replicaPrimacy.leaseStartTime()).whenComplete(
                             (r, e) -> setDelayedAckProcessor(r, 
req.delayedAckProcessor()))
@@ -541,12 +537,11 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
         } else if (request instanceof ReadWriteMultiRowPkReplicaRequest) {
             var req = (ReadWriteMultiRowPkReplicaRequest) request;
 
-            var opId = new OperationId(senderId, req.timestamp().longValue());
+            var opId = new OperationId(senderId, req.timestamp().longValue(), 
req.requestType());
 
             return appendTxCommand(
                     req.transactionId(),
                     opId,
-                    req.requestType(),
                     req.full(),
                     () -> processMultiEntryAction(req, 
replicaPrimacy.leaseStartTime()).whenComplete(
                             (r, e) -> setDelayedAckProcessor(r, 
req.delayedAckProcessor()))
@@ -554,12 +549,11 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
         } else if (request instanceof ReadWriteSwapRowReplicaRequest) {
             var req = (ReadWriteSwapRowReplicaRequest) request;
 
-            var opId = new OperationId(senderId, req.timestamp().longValue());
+            var opId = new OperationId(senderId, req.timestamp().longValue(), 
req.requestType());
 
             return appendTxCommand(
                     req.transactionId(),
                     opId,
-                    req.requestType(),
                     req.full(),
                     () -> processTwoEntriesAction(req, 
replicaPrimacy.leaseStartTime()).whenComplete(
                             (r, e) -> setDelayedAckProcessor(r, 
req.delayedAckProcessor()))
@@ -578,10 +572,10 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
                     .txLabel(req.txLabel())
                     .build());
 
-            var opId = new OperationId(senderId, req.timestamp().longValue());
+            var opId = new OperationId(senderId, req.timestamp().longValue(), 
RW_SCAN);
 
             // Implicit RW scan can be committed locally on a last batch or 
error.
-            return appendTxCommand(req.transactionId(), opId, RW_SCAN, false, 
() -> processScanRetrieveBatchAction(req))
+            return appendTxCommand(req.transactionId(), opId, false, () -> 
processScanRetrieveBatchAction(req))
                     .thenCompose(rows -> {
                         if (allElementsAreNull(rows)) {
                             return completedFuture(rows);
@@ -1450,17 +1444,17 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
 
             // The reason for the forced switch is that otherwise write 
intents would not be switched (if there is no volatile state and
             // FuturesCleanupResult.hadUpdateFutures() returns false).
-            forceCleanup.set(txOps.futures.isEmpty());
+            forceCleanup.set(txOps.isEmpty());
 
-            txOps.futures.forEach((opType, futures) -> {
-                if (opType.isRwRead()) {
-                    txReadFutures.addAll(futures.values());
+            txOps.futures.forEach((opId, future) -> {
+                if (opId.requestType.isRwRead()) {
+                    txReadFutures.add(future);
                 } else {
-                    txUpdateFutures.addAll(futures.values());
+                    txUpdateFutures.add(future);
                 }
             });
 
-            txOps.futures.clear();
+            txOps.clear();
 
             return null;
         });
@@ -1575,7 +1569,6 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
      *
      * @param txId Transaction id.
      * @param opId Operation id.
-     * @param cmdType Command type.
      * @param full {@code True} if a full transaction and can be immediately 
committed.
      * @param op Operation closure.
      * @return A future object representing the result of the given operation.
@@ -1583,7 +1576,6 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
     private <T> CompletableFuture<T> appendTxCommand(
             UUID txId,
             OperationId opId,
-            RequestType cmdType,
             boolean full,
             Supplier<CompletableFuture<T>> op
     ) {
@@ -1612,7 +1604,7 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
                 txOps = new TxCleanupReadyFutureList();
             }
 
-            txOps.futures.computeIfAbsent(cmdType, type -> new 
HashMap<>()).put(opId, cleanupReadyFut);
+            txOps.putOrReplaceFuture(opId, cleanupReadyFut);
 
             return txOps;
         });
@@ -3637,10 +3629,19 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
      * {@code state} that represents a transaction state.
      */
     private static class TxCleanupReadyFutureList {
-        /**
-         * Operation type is mapped operation futures.
-         */
-        final Map<RequestType, Map<OperationId, CompletableFuture<?>>> futures 
= new EnumMap<>(RequestType.class);
+        final Map<OperationId, CompletableFuture<?>> futures = new HashMap<>();
+
+        boolean isEmpty() {
+            return futures.isEmpty();
+        }
+
+        void clear() {
+            futures.clear();
+        }
+
+        void putOrReplaceFuture(OperationId opId, CompletableFuture<?> fut) {
+            futures.put(opId, fut);
+        }
     }
 
     @Override
@@ -3857,10 +3858,13 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
      */
     private static class OperationId {
         /** Operation node initiator id. */
-        private UUID initiatorId;
+        private final UUID initiatorId;
 
         /** Timestamp. */
-        private long ts;
+        private final long ts;
+
+        /** Request typ. */
+        private final RequestType requestType;
 
         /**
          * The constructor.
@@ -3868,9 +3872,10 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
          * @param initiatorId Sender node id.
          * @param ts Timestamp.
          */
-        public OperationId(UUID initiatorId, long ts) {
+        public OperationId(UUID initiatorId, long ts, RequestType requestType) 
{
             this.initiatorId = initiatorId;
             this.ts = ts;
+            this.requestType = requestType;
         }
 
         @Override
@@ -3887,13 +3892,17 @@ public class PartitionReplicaListener implements 
ReplicaTableProcessor {
             if (ts != that.ts) {
                 return false;
             }
+            if (requestType != that.requestType) {
+                return false;
+            }
             return initiatorId.equals(that.initiatorId);
         }
 
         @Override
         public int hashCode() {
             int result = initiatorId.hashCode();
-            result = 31 * result + (int) (ts ^ (ts >>> 32));
+            result = 31 * result + Long.hashCode(ts);
+            result = 31 * result + requestType.hashCode();
             return result;
         }
     }

Reply via email to