This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new f6beed5d580 IGNITE-27601 TxCleanupReadyFutureList is modified to
consume less memory (#7454)
f6beed5d580 is described below
commit f6beed5d5808705cb3faa05d16af46442bc0e897
Author: Denis Chudov <[email protected]>
AuthorDate: Thu Jan 22 14:33:09 2026 +0400
IGNITE-27601 TxCleanupReadyFutureList is modified to consume less memory
(#7454)
---
.../replicator/PartitionReplicaListener.java | 69 ++++++++++++----------
1 file changed, 39 insertions(+), 30 deletions(-)
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 0129f60fafc..9bd4da24b5f 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -63,7 +63,6 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
-import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -502,12 +501,11 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
if (request instanceof ReadWriteSingleRowReplicaRequest) {
var req = (ReadWriteSingleRowReplicaRequest) request;
- var opId = new OperationId(senderId, req.timestamp().longValue());
+ var opId = new OperationId(senderId, req.timestamp().longValue(),
req.requestType());
return appendTxCommand(
req.transactionId(),
opId,
- req.requestType(),
req.full(),
() -> processSingleEntryAction(req,
replicaPrimacy.leaseStartTime()).whenComplete(
(r, e) -> setDelayedAckProcessor(r,
req.delayedAckProcessor()))
@@ -515,12 +513,11 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
} else if (request instanceof ReadWriteSingleRowPkReplicaRequest) {
var req = (ReadWriteSingleRowPkReplicaRequest) request;
- var opId = new OperationId(senderId, req.timestamp().longValue());
+ var opId = new OperationId(senderId, req.timestamp().longValue(),
req.requestType());
return appendTxCommand(
req.transactionId(),
opId,
- req.requestType(),
req.full(),
() -> processSingleEntryAction(req,
replicaPrimacy.leaseStartTime()).whenComplete(
(r, e) -> setDelayedAckProcessor(r,
req.delayedAckProcessor()))
@@ -528,12 +525,11 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
} else if (request instanceof ReadWriteMultiRowReplicaRequest) {
var req = (ReadWriteMultiRowReplicaRequest) request;
- var opId = new OperationId(senderId, req.timestamp().longValue());
+ var opId = new OperationId(senderId, req.timestamp().longValue(),
req.requestType());
return appendTxCommand(
req.transactionId(),
opId,
- req.requestType(),
req.full(),
() -> processMultiEntryAction(req,
replicaPrimacy.leaseStartTime()).whenComplete(
(r, e) -> setDelayedAckProcessor(r,
req.delayedAckProcessor()))
@@ -541,12 +537,11 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
} else if (request instanceof ReadWriteMultiRowPkReplicaRequest) {
var req = (ReadWriteMultiRowPkReplicaRequest) request;
- var opId = new OperationId(senderId, req.timestamp().longValue());
+ var opId = new OperationId(senderId, req.timestamp().longValue(),
req.requestType());
return appendTxCommand(
req.transactionId(),
opId,
- req.requestType(),
req.full(),
() -> processMultiEntryAction(req,
replicaPrimacy.leaseStartTime()).whenComplete(
(r, e) -> setDelayedAckProcessor(r,
req.delayedAckProcessor()))
@@ -554,12 +549,11 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
} else if (request instanceof ReadWriteSwapRowReplicaRequest) {
var req = (ReadWriteSwapRowReplicaRequest) request;
- var opId = new OperationId(senderId, req.timestamp().longValue());
+ var opId = new OperationId(senderId, req.timestamp().longValue(),
req.requestType());
return appendTxCommand(
req.transactionId(),
opId,
- req.requestType(),
req.full(),
() -> processTwoEntriesAction(req,
replicaPrimacy.leaseStartTime()).whenComplete(
(r, e) -> setDelayedAckProcessor(r,
req.delayedAckProcessor()))
@@ -578,10 +572,10 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
.txLabel(req.txLabel())
.build());
- var opId = new OperationId(senderId, req.timestamp().longValue());
+ var opId = new OperationId(senderId, req.timestamp().longValue(),
RW_SCAN);
// Implicit RW scan can be committed locally on a last batch or
error.
- return appendTxCommand(req.transactionId(), opId, RW_SCAN, false,
() -> processScanRetrieveBatchAction(req))
+ return appendTxCommand(req.transactionId(), opId, false, () ->
processScanRetrieveBatchAction(req))
.thenCompose(rows -> {
if (allElementsAreNull(rows)) {
return completedFuture(rows);
@@ -1450,17 +1444,17 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
// The reason for the forced switch is that otherwise write
intents would not be switched (if there is no volatile state and
// FuturesCleanupResult.hadUpdateFutures() returns false).
- forceCleanup.set(txOps.futures.isEmpty());
+ forceCleanup.set(txOps.isEmpty());
- txOps.futures.forEach((opType, futures) -> {
- if (opType.isRwRead()) {
- txReadFutures.addAll(futures.values());
+ txOps.futures.forEach((opId, future) -> {
+ if (opId.requestType.isRwRead()) {
+ txReadFutures.add(future);
} else {
- txUpdateFutures.addAll(futures.values());
+ txUpdateFutures.add(future);
}
});
- txOps.futures.clear();
+ txOps.clear();
return null;
});
@@ -1575,7 +1569,6 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
*
* @param txId Transaction id.
* @param opId Operation id.
- * @param cmdType Command type.
* @param full {@code True} if a full transaction and can be immediately
committed.
* @param op Operation closure.
* @return A future object representing the result of the given operation.
@@ -1583,7 +1576,6 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
private <T> CompletableFuture<T> appendTxCommand(
UUID txId,
OperationId opId,
- RequestType cmdType,
boolean full,
Supplier<CompletableFuture<T>> op
) {
@@ -1612,7 +1604,7 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
txOps = new TxCleanupReadyFutureList();
}
- txOps.futures.computeIfAbsent(cmdType, type -> new
HashMap<>()).put(opId, cleanupReadyFut);
+ txOps.putOrReplaceFuture(opId, cleanupReadyFut);
return txOps;
});
@@ -3637,10 +3629,19 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
* {@code state} that represents a transaction state.
*/
private static class TxCleanupReadyFutureList {
- /**
- * Operation type is mapped operation futures.
- */
- final Map<RequestType, Map<OperationId, CompletableFuture<?>>> futures
= new EnumMap<>(RequestType.class);
+ final Map<OperationId, CompletableFuture<?>> futures = new HashMap<>();
+
+ boolean isEmpty() {
+ return futures.isEmpty();
+ }
+
+ void clear() {
+ futures.clear();
+ }
+
+ void putOrReplaceFuture(OperationId opId, CompletableFuture<?> fut) {
+ futures.put(opId, fut);
+ }
}
@Override
@@ -3857,10 +3858,13 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
*/
private static class OperationId {
/** Operation node initiator id. */
- private UUID initiatorId;
+ private final UUID initiatorId;
/** Timestamp. */
- private long ts;
+ private final long ts;
+
+ /** Request typ. */
+ private final RequestType requestType;
/**
* The constructor.
@@ -3868,9 +3872,10 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
* @param initiatorId Sender node id.
* @param ts Timestamp.
*/
- public OperationId(UUID initiatorId, long ts) {
+ public OperationId(UUID initiatorId, long ts, RequestType requestType)
{
this.initiatorId = initiatorId;
this.ts = ts;
+ this.requestType = requestType;
}
@Override
@@ -3887,13 +3892,17 @@ public class PartitionReplicaListener implements
ReplicaTableProcessor {
if (ts != that.ts) {
return false;
}
+ if (requestType != that.requestType) {
+ return false;
+ }
return initiatorId.equals(that.initiatorId);
}
@Override
public int hashCode() {
int result = initiatorId.hashCode();
- result = 31 * result + (int) (ts ^ (ts >>> 32));
+ result = 31 * result + Long.hashCode(ts);
+ result = 31 * result + requestType.hashCode();
return result;
}
}