This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 652a4d5da83 IGNITE-28543 Throttle VacuumTxStateReplicaRequests (#7987)
652a4d5da83 is described below
commit 652a4d5da8344afbb03c8937eb4341df7c794931
Author: Cyrill <[email protected]>
AuthorDate: Wed Apr 15 10:43:31 2026 +0300
IGNITE-28543 Throttle VacuumTxStateReplicaRequests (#7987)
Co-authored-by: Kirill Sizov <[email protected]>
---
.../tx/impl/PersistentTxStateVacuumizer.java | 85 +++++++---
.../tx/impl/PersistentTxStateVacuumizerTest.java | 178 +++++++++++++++++++++
2 files changed, 239 insertions(+), 24 deletions(-)
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java
index 159483b0a9a..21b681ec893 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java
@@ -25,6 +25,7 @@ import static
org.apache.ignite.internal.util.ExceptionUtils.hasCause;
import java.util.ArrayList;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -60,6 +61,9 @@ public class PersistentTxStateVacuumizer {
private static final String VACUUM_THROTTLE_KEY = "vacuum-failed";
+ /** Maximum number of transaction IDs per vacuum request to avoid
serialization timeouts. */
+ static final int VACUUM_BATCH_SIZE = 1000;
+
private static final TxMessagesFactory TX_MESSAGES_FACTORY = new
TxMessagesFactory();
private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new
ReplicaMessagesFactory();
@@ -134,30 +138,13 @@ public class PersistentTxStateVacuumizer {
return nullCompletedFuture();
}
- VacuumTxStateReplicaRequest request =
TX_MESSAGES_FACTORY.vacuumTxStateReplicaRequest()
-
.enlistmentConsistencyToken(replicaMeta.getStartTime().longValue())
-
.groupId(toZonePartitionIdMessage(REPLICA_MESSAGES_FACTORY, commitPartitionId))
- .transactionIds(filteredTxIds)
- .build();
-
- return replicaService.invoke(localNode,
request).whenComplete((v, e) -> {
- if (e == null) {
- successful.addAll(filteredTxIds);
-
vacuumizedPersistentTxnStatesCount.addAndGet(filteredTxIds.size());
- } else if (expectedException(e)) {
- // We can log the exceptions without
further handling because failed requests' txns are not added
- // to the set of successful and will be
retried. PrimaryReplicaMissException can be considered as
- // a part of regular flow and doesn't need
to be logged. NodeStoppingException should be ignored as
- // vacuumization will be retried after
restart.
- LOG.debug("Failed to vacuum tx states from
the persistent storage.", e);
- } else {
- // In general, even though this vacuum
round has completed unsuccessfully,
- // due to ReplicationTimeoutException for
instance,
- // it does not mean that correctness is
violated, and we need to shutdown the node.
- // Perhaps the next attempt will be
successful.
- throttledLogger.warn(VACUUM_THROTTLE_KEY,
"Failed to vacuum tx states from the persistent storage.", e);
- }
- });
+ return sendBatchedVacuumRequests(
+ replicaMeta.getStartTime().longValue(),
+ commitPartitionId,
+ filteredTxIds,
+ successful,
+ vacuumizedPersistentTxnStatesCount
+ );
} else {
successful.addAll(txs.stream().map(v ->
v.txId).collect(toSet()));
@@ -172,6 +159,56 @@ public class PersistentTxStateVacuumizer {
.handle((unused, unusedEx) -> new
PersistentTxStateVacuumResult(successful,
vacuumizedPersistentTxnStatesCount.get()));
}
+ private CompletableFuture<Void> sendBatchedVacuumRequests(
+ long enlistmentConsistencyToken,
+ ZonePartitionId commitPartitionId,
+ Set<UUID> txIds,
+ Set<UUID> successful,
+ AtomicInteger vacuumizedCount
+ ) {
+ List<CompletableFuture<?>> batchFutures = new ArrayList<>();
+ Iterator<UUID> it = txIds.iterator();
+
+ while (it.hasNext()) {
+ Set<UUID> batch = new HashSet<>(Math.min(VACUUM_BATCH_SIZE,
txIds.size()));
+
+ for (int j = 0; j < VACUUM_BATCH_SIZE && it.hasNext(); j++) {
+ batch.add(it.next());
+ }
+
+ batchFutures.add(sendVacuumBatch(enlistmentConsistencyToken,
commitPartitionId, batch, successful, vacuumizedCount));
+ }
+
+ return allOf(batchFutures);
+ }
+
+ private CompletableFuture<?> sendVacuumBatch(
+ long enlistmentConsistencyToken,
+ ZonePartitionId commitPartitionId,
+ Set<UUID> batch,
+ Set<UUID> successful,
+ AtomicInteger vacuumizedCount
+ ) {
+ VacuumTxStateReplicaRequest request =
TX_MESSAGES_FACTORY.vacuumTxStateReplicaRequest()
+ .enlistmentConsistencyToken(enlistmentConsistencyToken)
+ .groupId(toZonePartitionIdMessage(REPLICA_MESSAGES_FACTORY,
commitPartitionId))
+ .transactionIds(batch)
+ .build();
+
+ return replicaService.invoke(localNode, request).whenComplete((v, e)
-> {
+ if (e == null) {
+ successful.addAll(batch);
+ vacuumizedCount.addAndGet(batch.size());
+ } else if (expectedException(e)) {
+ // Failed requests' txns are not added to the set of
successful and will be retried.
+ LOG.debug("Failed to vacuum tx states from the persistent
storage.", e);
+ } else {
+ throttledLogger.warn(VACUUM_THROTTLE_KEY,
+ "Failed to vacuum tx states from the persistent
storage.", e);
+ }
+ });
+ }
+
private static boolean expectedException(Throwable e) {
return hasCause(e,
PrimaryReplicaMissException.class,
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizerTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizerTest.java
new file mode 100644
index 00000000000..a6386c227aa
--- /dev/null
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizerTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx.impl;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static
org.apache.ignite.internal.tx.impl.PersistentTxStateVacuumizer.VACUUM_BATCH_SIZE;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.everyItem;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.lenient;
+import static org.mockito.Mockito.when;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.TestClockService;
+import org.apache.ignite.internal.network.InternalClusterNode;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.TestReplicaMetaImpl;
+import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import
org.apache.ignite.internal.tx.impl.PersistentTxStateVacuumizer.PersistentTxStateVacuumResult;
+import
org.apache.ignite.internal.tx.impl.PersistentTxStateVacuumizer.VacuumizableTx;
+import org.apache.ignite.internal.tx.message.VacuumTxStateReplicaRequest;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class PersistentTxStateVacuumizerTest extends BaseIgniteAbstractTest {
+ @Mock
+ private ReplicaService replicaService;
+
+ @Mock
+ private InternalClusterNode localNode;
+
+ @Mock
+ private PlacementDriver placementDriver;
+
+ @Captor
+ private ArgumentCaptor<VacuumTxStateReplicaRequest> requestCaptor;
+
+ private PersistentTxStateVacuumizer vacuumizer;
+
+ @BeforeEach
+ void setUp() {
+ UUID localNodeId = UUID.randomUUID();
+ when(localNode.id()).thenReturn(localNodeId);
+ when(localNode.name()).thenReturn("test-node");
+
+ TestReplicaMetaImpl replicaMeta = new
TestReplicaMetaImpl(localNode.name(), localNodeId);
+
+ when(placementDriver.getPrimaryReplica(any(), any()))
+ .thenReturn(completedFuture(replicaMeta));
+ lenient().when(replicaService.invoke(any(InternalClusterNode.class),
requestCaptor.capture()))
+ .thenReturn(completedFuture(null));
+
+ vacuumizer = new PersistentTxStateVacuumizer(
+ replicaService,
+ localNode,
+ new TestClockService(new HybridClockImpl()),
+ placementDriver
+ );
+ }
+
+ @Test
+ void smallBatchSentAsSingleRequest() {
+ int count = 10;
+ Map<ZonePartitionId, Set<VacuumizableTx>> txIds = createTxIds(count);
+
+ PersistentTxStateVacuumResult result =
vacuumizer.vacuumPersistentTxStates(txIds).join();
+
+ assertEquals(count, result.vacuumizedPersistentTxnStatesCount);
+ assertEquals(count, result.txnsToVacuum.size());
+ assertEquals(1, requestCaptor.getAllValues().size());
+ assertEquals(count, requestCaptor.getValue().transactionIds().size());
+ }
+
+ @Test
+ void largeBatchIsSplitIntoMultipleRequests() {
+ int count = VACUUM_BATCH_SIZE * 3 + 1;
+ Map<ZonePartitionId, Set<VacuumizableTx>> txIds = createTxIds(count);
+
+ PersistentTxStateVacuumResult result =
vacuumizer.vacuumPersistentTxStates(txIds).join();
+
+ assertEquals(count, result.vacuumizedPersistentTxnStatesCount);
+ assertEquals(count, result.txnsToVacuum.size());
+
+ // Should be split into 4 requests.
+ assertEquals(4, requestCaptor.getAllValues().size());
+
+ // Each request should have at most VACUUM_BATCH_SIZE tx IDs.
+ assertThat(
+ requestCaptor.getAllValues().stream().map(r ->
r.transactionIds().size()).toList(),
+ everyItem(lessThanOrEqualTo(VACUUM_BATCH_SIZE))
+ );
+
+ // All tx IDs should be covered.
+ Set<UUID> allSentIds = new HashSet<>();
+ for (VacuumTxStateReplicaRequest req : requestCaptor.getAllValues()) {
+ allSentIds.addAll(req.transactionIds());
+ }
+ assertEquals(count, allSentIds.size());
+ }
+
+ @Test
+ void partialBatchFailureDoesNotAffectOtherBatches() {
+ int count = VACUUM_BATCH_SIZE * 2;
+ Map<ZonePartitionId, Set<VacuumizableTx>> txIds = createTxIds(count);
+
+ // First invocation succeeds, second fails.
+ when(replicaService.invoke(any(InternalClusterNode.class),
any(VacuumTxStateReplicaRequest.class)))
+ .thenReturn(completedFuture(null))
+ .thenReturn(CompletableFuture.failedFuture(new
RuntimeException("test failure")));
+
+ PersistentTxStateVacuumResult result =
vacuumizer.vacuumPersistentTxStates(txIds).join();
+
+ // Only the first batch's tx IDs should be in the successful set.
+ assertEquals(VACUUM_BATCH_SIZE,
result.vacuumizedPersistentTxnStatesCount);
+ assertEquals(VACUUM_BATCH_SIZE, result.txnsToVacuum.size());
+ }
+
+ @Test
+ void txsWithoutCleanupTimestampAreSuccessfulWithoutRequest() {
+ ZonePartitionId partitionId = new ZonePartitionId(1, 0);
+ Set<VacuumizableTx> txs = new HashSet<>();
+
+ // Tx without cleanup timestamp — should go directly to successful.
+ UUID txWithoutCleanup = UUID.randomUUID();
+ txs.add(new VacuumizableTx(txWithoutCleanup, null));
+
+ Map<ZonePartitionId, Set<VacuumizableTx>> txIds = Map.of(partitionId,
txs);
+
+ PersistentTxStateVacuumResult result =
vacuumizer.vacuumPersistentTxStates(txIds).join();
+
+ assertTrue(result.txnsToVacuum.contains(txWithoutCleanup));
+ assertEquals(0, result.vacuumizedPersistentTxnStatesCount);
+ // No requests should be sent since the only tx has no cleanup
timestamp.
+ assertEquals(0, requestCaptor.getAllValues().size());
+ }
+
+ private static Map<ZonePartitionId, Set<VacuumizableTx>> createTxIds(int
count) {
+ ZonePartitionId partitionId = new ZonePartitionId(1, 0);
+ Set<VacuumizableTx> txs = new HashSet<>();
+
+ for (int i = 0; i < count; i++) {
+ txs.add(new VacuumizableTx(UUID.randomUUID(), 1L));
+ }
+
+ return Map.of(partitionId, txs);
+ }
+}