This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 0b7d3c4a3a5 IGNITE-27563 Properly close zone partition resources on
stop (#7409)
0b7d3c4a3a5 is described below
commit 0b7d3c4a3a5dc39ab35b5d63db7f68d5170a0cdd
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Thu Jan 15 11:28:00 2026 +0400
IGNITE-27563 Properly close zone partition resources on stop (#7409)
---
.../PartitionReplicaLifecycleManager.java | 3 +
.../partition/replicator/ZoneResourcesManager.java | 21 ++++--
.../PartitionReplicaLifecycleManagerTest.java | 16 ++++-
.../replicator/ZoneResourcesManagerTest.java | 75 ++++++++++++++++++----
4 files changed, 94 insertions(+), 21 deletions(-)
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index d96464231e0..f87660dcb76 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -188,6 +188,7 @@ import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.LongPriorityQueue;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.internal.util.SafeTimeValuesTracker;
import org.apache.ignite.internal.util.TrackerClosedException;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
@@ -774,10 +775,12 @@ public class PartitionReplicaLifecycleManager extends
Supplier<CompletableFuture<Boolean>> startReplicaSupplier = () -> {
var storageIndexTracker = new PendingComparableValuesTracker<Long,
Void>(0L);
+ var safeTimeTracker = new
SafeTimeValuesTracker(HybridTimestamp.MIN_VALUE);
ZonePartitionResources zoneResources =
zoneResourcesManager.allocateZonePartitionResources(
zonePartitionId,
partitionCount,
+ safeTimeTracker,
storageIndexTracker
);
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
index f12603863c1..81cd7f337ae 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
@@ -28,7 +28,6 @@ import java.util.concurrent.Executor;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.close.ManuallyCloseable;
import org.apache.ignite.internal.failure.FailureProcessor;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.network.TopologyService;
import
org.apache.ignite.internal.partition.replicator.raft.ZonePartitionRaftListener;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.LogStorageAccessImpl;
@@ -102,6 +101,7 @@ public class ZoneResourcesManager implements
ManuallyCloseable {
ZonePartitionResources allocateZonePartitionResources(
ZonePartitionId zonePartitionId,
int partitionCount,
+ SafeTimeValuesTracker safeTimeTracker,
PendingComparableValuesTracker<Long, Void> storageIndexTracker
) {
ZoneResources zoneResources = resourcesByZoneId.computeIfAbsent(
@@ -112,8 +112,6 @@ public class ZoneResourcesManager implements
ManuallyCloseable {
TxStatePartitionStorage txStatePartitionStorage =
zoneResources.txStateStorage
.getOrCreatePartitionStorage(zonePartitionId.partitionId());
- var safeTimeTracker = new
SafeTimeValuesTracker(HybridTimestamp.MIN_VALUE);
-
var raftGroupListener = new ZonePartitionRaftListener(
zonePartitionId,
txStatePartitionStorage,
@@ -176,8 +174,7 @@ public class ZoneResourcesManager implements
ManuallyCloseable {
busyLock.block();
for (ZoneResources zoneResources : resourcesByZoneId.values()) {
- zoneResources.txStateStorage.close();
- zoneResources.resourcesByPartitionId.clear();
+ zoneResources.close();
}
resourcesByZoneId.clear();
@@ -255,6 +252,12 @@ public class ZoneResourcesManager implements
ManuallyCloseable {
ZoneResources(TxStateStorage txStateStorage) {
this.txStateStorage = txStateStorage;
}
+
+ void close() {
+ txStateStorage.close();
+ resourcesByPartitionId.forEach((index, partitionResources) ->
partitionResources.close());
+ resourcesByPartitionId.clear();
+ }
}
/**
@@ -319,9 +322,17 @@ public class ZoneResourcesManager implements
ManuallyCloseable {
return replicaListenerFuture;
}
+ /** Closes trackers. */
public void closeTrackers() {
safeTimeTracker.close();
storageIndexTracker.close();
}
+
+ /** Closes all resources. */
+ public void close() {
+ closeTrackers();
+ raftListener.onShutdown();
+ txStatePartitionStorage.close();
+ }
}
}
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
index 9a21318173e..0394c11a97f 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.partition.replicator;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
import static java.util.UUID.randomUUID;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static
org.apache.ignite.internal.catalog.CatalogTestUtils.TEST_DELAY_DURATION;
@@ -53,6 +54,8 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.lang.annotation.Retention;
+import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -308,8 +311,12 @@ class PartitionReplicaLifecycleManagerTest extends
BaseIgniteAbstractTest {
}
@AfterEach
- void tearDown() {
- List<IgniteComponent> components =
List.of(partitionReplicaLifecycleManager, replicaManager, catalogManager,
metaStorageManager);
+ void tearDown(TestInfo testInfo) {
+ List<IgniteComponent> components = new
ArrayList<>(List.of(replicaManager, catalogManager, metaStorageManager));
+
+ if
(!testInfo.getTestMethod().orElseThrow().isAnnotationPresent(ManagerIsStoppedByTest.class))
{
+ components.add(0, partitionReplicaLifecycleManager);
+ }
components.forEach(IgniteComponent::beforeNodeStop);
@@ -421,6 +428,7 @@ class PartitionReplicaLifecycleManagerTest extends
BaseIgniteAbstractTest {
}
@Test
+ @ManagerIsStoppedByTest
public void
partitionLifecycleManagerStopsCorrectWhenTxStatePartitionStoragesAreStoppedExceptionally()
throws Exception {
doReturn(commonZonePartitionResources).when(zoneResourcesManager).getZonePartitionResources(any());
@@ -444,4 +452,8 @@ class PartitionReplicaLifecycleManagerTest extends
BaseIgniteAbstractTest {
defaultZoneResources.forEach(resources ->
verify(resources.txStatePartitionStorage(), atLeastOnce()).close());
}
+
+ @Retention(RUNTIME)
+ private @interface ManagerIsStoppedByTest {
+ }
}
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManagerTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManagerTest.java
index 6d2b0c50cf7..99a1ab48328 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManagerTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManagerTest.java
@@ -27,6 +27,8 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -36,6 +38,7 @@ import java.util.stream.IntStream;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.network.TopologyService;
@@ -49,6 +52,7 @@ import
org.apache.ignite.internal.testframework.InjectExecutorService;
import org.apache.ignite.internal.tx.TxManager;
import
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
+import org.apache.ignite.internal.util.SafeTimeValuesTracker;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -63,9 +67,6 @@ class ZoneResourcesManagerTest extends IgniteAbstractTest {
private ZoneResourcesManager manager;
- // TODO https://issues.apache.org/jira/browse/IGNITE-24654 Ensure that
tracker is closed.
- private PendingComparableValuesTracker<Long, Void> storageIndexTracker;
-
@BeforeEach
void init(
@Mock LogSyncer logSyncer,
@@ -97,12 +98,13 @@ class ZoneResourcesManagerTest extends IgniteAbstractTest {
executor,
replicaManager
);
-
- storageIndexTracker = new PendingComparableValuesTracker<>(0L);
-
assertThat(sharedStorage.startAsync(new ComponentContext()),
willCompleteSuccessfully());
}
+ private static PendingComparableValuesTracker<Long, Void>
newStorageIndexTracker() {
+ return new PendingComparableValuesTracker<>(0L);
+ }
+
@AfterEach
void cleanup() {
manager.close();
@@ -112,7 +114,7 @@ class ZoneResourcesManagerTest extends IgniteAbstractTest {
@Test
void allocatesResources() {
- ZonePartitionResources resources = allocatePartitionResources(new
ZonePartitionId(1, 1), 10, storageIndexTracker);
+ ZonePartitionResources resources = allocatePartitionResources(new
ZonePartitionId(1, 1), 10);
assertThat(resources.txStatePartitionStorage(), is(notNullValue()));
assertThat(resources.raftListener(), is(notNullValue()));
@@ -120,25 +122,60 @@ class ZoneResourcesManagerTest extends IgniteAbstractTest
{
assertThat(resources.replicaListenerFuture().isDone(), is(false));
}
+ private static SafeTimeValuesTracker newSafeTimeTracker() {
+ return new SafeTimeValuesTracker(HybridTimestamp.MIN_VALUE);
+ }
+
@Test
void closesResourcesOnShutdown() {
- ZonePartitionResources zone1storage1 = allocatePartitionResources(new
ZonePartitionId(1, 1), 10, storageIndexTracker);
- ZonePartitionResources zone1storage5 = allocatePartitionResources(new
ZonePartitionId(1, 5), 10, storageIndexTracker);
- ZonePartitionResources zone2storage3 = allocatePartitionResources(new
ZonePartitionId(2, 3), 10, storageIndexTracker);
+ SafeTimeValuesTracker zone1SafeTimeTracker1 =
spy(newSafeTimeTracker());
+ SafeTimeValuesTracker zone1SafeTimeTracker5 =
spy(newSafeTimeTracker());
+ SafeTimeValuesTracker zone2SafeTimeTracker3 =
spy(newSafeTimeTracker());
+
+ PendingComparableValuesTracker<Long, Void> zone1IndexTracker1 =
spy(newStorageIndexTracker());
+ PendingComparableValuesTracker<Long, Void> zone1IndexTracker5 =
spy(newStorageIndexTracker());
+ PendingComparableValuesTracker<Long, Void> zone2IndexTracker3 =
spy(newStorageIndexTracker());
+
+ ZonePartitionResources zone1storage1 = allocatePartitionResources(
+ new ZonePartitionId(1, 1),
+ 10,
+ zone1SafeTimeTracker1,
+ zone1IndexTracker1
+ );
+ ZonePartitionResources zone1storage5 = allocatePartitionResources(
+ new ZonePartitionId(1, 5),
+ 10,
+ zone1SafeTimeTracker5,
+ zone1IndexTracker5
+ );
+ ZonePartitionResources zone2storage3 = allocatePartitionResources(
+ new ZonePartitionId(2, 3),
+ 10,
+ zone2SafeTimeTracker3,
+ zone2IndexTracker3
+ );
manager.close();
assertThatStorageIsStopped(zone1storage1);
assertThatStorageIsStopped(zone1storage5);
assertThatStorageIsStopped(zone2storage3);
+
+ verify(zone1SafeTimeTracker1).close();
+ verify(zone1SafeTimeTracker5).close();
+ verify(zone2SafeTimeTracker3).close();
+
+ verify(zone1IndexTracker1).close();
+ verify(zone1IndexTracker5).close();
+ verify(zone2IndexTracker3).close();
}
@Test
void removesTxStatePartitionStorageOnDestroy() {
int zoneId = 1;
- allocatePartitionResources(new ZonePartitionId(zoneId, 1), 10,
storageIndexTracker);
- allocatePartitionResources(new ZonePartitionId(zoneId, 2), 10,
storageIndexTracker);
+ allocatePartitionResources(new ZonePartitionId(zoneId, 1), 10);
+ allocatePartitionResources(new ZonePartitionId(zoneId, 2), 10);
assertThat(manager.txStatePartitionStorage(zoneId, 1),
is(notNullValue()));
assertThat(manager.txStatePartitionStorage(zoneId, 2),
is(notNullValue()));
@@ -156,8 +193,9 @@ class ZoneResourcesManagerTest extends IgniteAbstractTest {
CompletableFuture<?>[] futures = IntStream.range(0, partCount)
.mapToObj(partId -> runAsync(
- () -> allocatePartitionResources(new
ZonePartitionId(zoneId, partId), partCount, storageIndexTracker), executor)
- )
+ () -> allocatePartitionResources(new
ZonePartitionId(zoneId, partId), partCount),
+ executor
+ ))
.toArray(CompletableFuture[]::new);
assertThat(allOf(futures), willCompleteSuccessfully());
@@ -172,14 +210,23 @@ class ZoneResourcesManagerTest extends IgniteAbstractTest
{
);
}
+ private ZonePartitionResources allocatePartitionResources(
+ ZonePartitionId zonePartitionId,
+ int partitionCount
+ ) {
+ return allocatePartitionResources(zonePartitionId, partitionCount,
newSafeTimeTracker(), newStorageIndexTracker());
+ }
+
private ZonePartitionResources allocatePartitionResources(
ZonePartitionId zonePartitionId,
int partitionCount,
+ SafeTimeValuesTracker safeTimeTracker,
PendingComparableValuesTracker<Long, Void> storageIndexTracker
) {
return bypassingThreadAssertions(() ->
manager.allocateZonePartitionResources(
zonePartitionId,
partitionCount,
+ safeTimeTracker,
storageIndexTracker
));
}