This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 87aafb41db1 IGNITE-24345 Defer zone destruction after drop (#7322)
87aafb41db1 is described below
commit 87aafb41db11ed213e795bbf0be3b976bc2845e2
Author: Alexander Lapin <[email protected]>
AuthorDate: Mon Dec 29 21:21:17 2025 +0200
IGNITE-24345 Defer zone destruction after drop (#7322)
Co-authored-by: Roman Puchkovskiy <[email protected]>
---
.../handler/ClientPrimaryReplicaTracker.java | 4 +-
.../ignite/internal/util}/LongPriorityQueue.java | 4 +-
.../internal/util}/LongPriorityQueueSelfTest.java | 2 +-
.../rebalance/ItRebalanceDistributedTest.java | 2 +-
.../distributionzones/DataNodesManager.java | 20 +-
.../distributionzones/DistributionZoneManager.java | 14 +-
...ibutionZoneManagerConfigurationChangesTest.java | 15 --
...istributionZoneManagerScaleUpScaleDownTest.java | 66 -----
.../index/ChangeIndexStatusTaskController.java | 2 +-
.../apache/ignite/internal/index/IndexManager.java | 2 +-
.../PartitionReplicaLifecycleManager.java | 255 +++++++++++++++++-
.../replicator/StartedReplicationGroups.java | 19 ++
.../replicator/ZonePartitionReplicaListener.java | 7 +
.../partition/replicator/ZoneResourcesManager.java | 19 ++
.../replicator/raft/ZonePartitionRaftListener.java | 9 +
.../raft/snapshot/PartitionSnapshotStorage.java | 9 +
.../engine/statistic/SqlStatisticManagerImpl.java | 2 +-
.../partition/ItPartitionDestructionTest.java | 285 +++++++++++++++++----
.../internal/table/distributed/TableManager.java | 46 +---
19 files changed, 590 insertions(+), 192 deletions(-)
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java
index f667a4cad70..e0cc976a9d8 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java
@@ -52,9 +52,9 @@ import
org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.schema.SchemaSyncService;
-import org.apache.ignite.internal.table.LongPriorityQueue;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.LongPriorityQueue;
import org.apache.ignite.lang.TableNotFoundException;
import org.jetbrains.annotations.Nullable;
@@ -341,7 +341,7 @@ public class ClientPrimaryReplicaTracker {
private void onLwmChanged(ChangeLowWatermarkEventParameters parameters) {
inBusyLockSafe(busyLock, () -> {
- // TODO: https://issues.apache.org/jira/browse/IGNITE-24345 -
support zone destruction.
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-25017 -
support zone destruction.
int earliestVersion =
catalogService.activeCatalogVersion(parameters.newLowWatermark().longValue());
List<DestroyTableEvent> events =
destructionEventsQueue.drainUpTo(earliestVersion);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/LongPriorityQueue.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/LongPriorityQueue.java
similarity index 95%
rename from
modules/table/src/main/java/org/apache/ignite/internal/table/LongPriorityQueue.java
rename to
modules/core/src/main/java/org/apache/ignite/internal/util/LongPriorityQueue.java
index 7c73f382569..bf24227a3c4 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/LongPriorityQueue.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/LongPriorityQueue.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.table;
+package org.apache.ignite.internal.util;
import java.util.ArrayList;
import java.util.Comparator;
@@ -24,7 +24,7 @@ import java.util.PriorityQueue;
import java.util.function.ToLongFunction;
/**
- * A thread-safe wrapper over {@link java.util.PriorityQueue}, which uses
{@code long} value for ordering.
+ * A thread-safe wrapper over {@link PriorityQueue}, which uses {@code long}
value for ordering.
* The implementation provides a method to poll top item up to the given
priority.
*
* @param <T> Item type.
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/LongPriorityQueueSelfTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/LongPriorityQueueSelfTest.java
similarity index 99%
rename from
modules/table/src/test/java/org/apache/ignite/internal/table/LongPriorityQueueSelfTest.java
rename to
modules/core/src/test/java/org/apache/ignite/internal/util/LongPriorityQueueSelfTest.java
index 3a52ebd6980..22656443cc8 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/LongPriorityQueueSelfTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/util/LongPriorityQueueSelfTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.table;
+package org.apache.ignite.internal.util;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 98d8d7aa0fb..1cdac08bce0 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -658,7 +658,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
@Test
@UseTestTxStateStorage
@UseRocksMetaStorage
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-24345")
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-27467")
void testDestroyPartitionStoragesOnRestartEvictedNode(TestInfo testInfo)
throws Exception {
Node node = getNode(0);
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DataNodesManager.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DataNodesManager.java
index d4777864412..b6357034452 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DataNodesManager.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DataNodesManager.java
@@ -1389,8 +1389,8 @@ public class DataNodesManager {
}
}
- CompletableFuture<?> onZoneDrop(int zoneId, HybridTimestamp timestamp) {
- return removeDataNodesKeys(zoneId, timestamp)
+ CompletableFuture<?> onZoneDestroy(int zoneId, int dropZoneCatalogVersion)
{
+ return removeDataNodesKeys(zoneId, dropZoneCatalogVersion)
.thenRun(() -> {
ZoneTimers zt = zoneTimers.remove(zoneId);
if (zt != null) {
@@ -1403,9 +1403,9 @@ public class DataNodesManager {
* Method deletes data nodes related values for the specified zone.
*
* @param zoneId Unique id of a zone.
- * @param timestamp Timestamp of an event that has triggered this method.
+ * @param dropZoneCatalogVersion Version of catalog when zone was dropped.
*/
- private CompletableFuture<?> removeDataNodesKeys(int zoneId,
HybridTimestamp timestamp) {
+ private CompletableFuture<?> removeDataNodesKeys(int zoneId, int
dropZoneCatalogVersion) {
if (!busyLock.enterBusy()) {
throw new IgniteInternalException(NODE_STOPPING_ERR, new
NodeStoppingException());
}
@@ -1414,7 +1414,7 @@ public class DataNodesManager {
Condition condition = exists(zoneDataNodesHistoryKey(zoneId));
Update removeKeysUpd = ops(
- // TODO remove(zoneDataNodesHistoryKey(zoneId)),
https://issues.apache.org/jira/browse/IGNITE-24345
+ remove(zoneDataNodesHistoryKey(zoneId)),
remove(zoneScaleUpTimerKey(zoneId)),
remove(zoneScaleDownTimerKey(zoneId))
).yield(true);
@@ -1427,16 +1427,18 @@ public class DataNodesManager {
if (e != null) {
if (!relatesToNodeStopping(e)) {
String errorMessage = String.format(
- "Failed to delete zone's dataNodes
keys [zoneId = %s, timestamp = %s]",
+ "Failed to delete zone's dataNodes
keys [zoneId = %s, dropZoneCatalogVersion = %s]",
zoneId,
- timestamp
+ dropZoneCatalogVersion
);
failureProcessor.process(new FailureContext(e,
errorMessage));
}
} else if (invokeResult) {
- LOG.info("Delete zone's dataNodes keys [zoneId =
{}, timestamp = {}]", zoneId, timestamp);
+ LOG.info("Delete zone's dataNodes keys [zoneId =
{}, dropZoneCatalogVersion = {}]", zoneId,
+ dropZoneCatalogVersion);
} else {
- LOG.debug("Failed to delete zone's dataNodes keys
[zoneId = {}, timestamp = {}]", zoneId, timestamp);
+ LOG.debug("Failed to delete zone's dataNodes keys
[zoneId = {}, dropZoneCatalogVersion = {}]", zoneId,
+ dropZoneCatalogVersion);
}
});
} finally {
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index b0c686f9975..194ac46b6f6 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -51,6 +51,7 @@ import static
org.apache.ignite.internal.metastorage.dsl.Statements.iif;
import static
org.apache.ignite.internal.util.ByteUtils.bytesToLongKeepingOrder;
import static
org.apache.ignite.internal.util.ByteUtils.longToBytesKeepingOrder;
import static org.apache.ignite.internal.util.ByteUtils.uuidToBytes;
+import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
@@ -803,7 +804,8 @@ public class DistributionZoneManager extends
}));
catalogManager.listen(ZONE_DROP, (DropZoneEventParameters parameters)
-> inBusyLock(busyLock, () -> {
- return onDropZoneBusy(parameters).thenApply((ignored) -> false);
+ onDropZoneBusy(parameters);
+ return falseCompletedFuture();
}));
catalogManager.listen(ZONE_ALTER, new
ManagerCatalogAlterZoneEventListener());
@@ -891,14 +893,12 @@ public class DistributionZoneManager extends
return nullCompletedFuture();
}
- private CompletableFuture<?> onDropZoneBusy(DropZoneEventParameters
parameters) {
+ private void onDropZoneBusy(DropZoneEventParameters parameters) {
unregisterMetricSource(parameters.zoneId());
+ }
- long causalityToken = parameters.causalityToken();
-
- HybridTimestamp timestamp =
metaStorageManager.timestampByRevisionLocally(causalityToken);
-
- return dataNodesManager.onZoneDrop(parameters.zoneId(), timestamp);
+ public CompletableFuture<?> onDropZoneDestroy(int zoneId, int
dropZoneCatalogVersion) {
+ return dataNodesManager.onZoneDestroy(zoneId, dropZoneCatalogVersion);
}
private class ManagerCatalogAlterZoneEventListener extends
CatalogAlterZoneEventListener {
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
index e0b4a550adb..c829c44ee6f 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
@@ -73,21 +73,6 @@ public class DistributionZoneManagerConfigurationChangesTest
extends BaseDistrib
assertZonesKeysInMetaStorage(zoneId, nodes);
}
- @ParameterizedTest
- @EnumSource(ConsistencyMode.class)
- void testZoneDeleteRemovesMetaStorageKey(ConsistencyMode consistencyMode)
throws Exception {
- createZone(ZONE_NAME, consistencyMode);
-
- int zoneId = getZoneId(ZONE_NAME);
-
- assertDataNodesFromLogicalNodesInStorage(zoneId, nodes,
keyValueStorage);
-
- dropZone(ZONE_NAME);
-
- // Data nodes should be removed from meta storage
- assertZonesKeysInMetaStorage(zoneId, null, false);
- }
-
@ParameterizedTest
@EnumSource(ConsistencyMode.class)
void testSeveralZoneCreationsUpdatesTriggerKey(ConsistencyMode
consistencyMode) throws Exception {
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpScaleDownTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpScaleDownTest.java
index 421e926b745..36d48b5c091 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpScaleDownTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpScaleDownTest.java
@@ -26,7 +26,6 @@ import static
org.apache.ignite.internal.distributionzones.DistributionZonesTest
import static
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertDataNodesInStorage;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertLogicalTopology;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.createDefaultZone;
-import static
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.dataNodeHistoryContext;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.logicalNodeFromNode;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.PARTITION_DISTRIBUTION_RESET_TIMEOUT;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
@@ -46,7 +45,6 @@ import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.descriptors.ConsistencyMode;
import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
-import
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.DataNodesHistoryContext;
import
org.apache.ignite.internal.distributionzones.events.HaZoneTopologyUpdateEvent;
import
org.apache.ignite.internal.distributionzones.events.HaZoneTopologyUpdateEventParams;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -221,70 +219,6 @@ public class DistributionZoneManagerScaleUpScaleDownTest
extends BaseDistributio
assertDataNodesFromLogicalNodesInStorage(defaultZone.id(),
clusterNodes2, keyValueStorage);
}
- @Test
- void testDropZoneDoNotPropagateDataNodesAfterScaleUp() throws Exception {
- startDistributionZoneManager();
-
- topology.putNode(NODE_A);
-
- topology.putNode(NODE_B);
-
- Set<LogicalNode> clusterNodes2 = Set.of(NODE_A, NODE_B);
-
- assertLogicalTopology(clusterNodes2, keyValueStorage);
-
- createZone(ZONE_NAME, IMMEDIATE_TIMER_VALUE, null, null);
-
- int zoneId = getZoneId(ZONE_NAME);
-
- assertDataNodesFromLogicalNodesInStorage(zoneId, clusterNodes2,
keyValueStorage);
-
- DataNodesHistoryContext context =
dataNodeHistoryContext(metaStorageManager, zoneId);
- assertTrue(context.scaleUpTimerPresent());
- assertTrue(context.scaleDownTimerPresent());
-
- dropZone(ZONE_NAME);
-
- // Data nodes history should not be dropped after zone drop. Deferred
removal should happen on LWM move.
- assertDataNodesFromLogicalNodesInStorage(zoneId, clusterNodes2,
keyValueStorage);
- context = dataNodeHistoryContext(metaStorageManager, zoneId);
- assertFalse(context.scaleUpTimerPresent());
- assertFalse(context.scaleDownTimerPresent());
- }
-
- @Test
- void testDropZoneDoNotPropagateDataNodesAfterScaleDown() throws Exception {
- startDistributionZoneManager();
-
- topology.putNode(NODE_A);
-
- topology.putNode(NODE_B);
-
- topology.removeNodes(Set.of(NODE_B));
-
- Set<LogicalNode> clusterNodes2 = Set.of(NODE_A);
-
- assertLogicalTopology(clusterNodes2, keyValueStorage);
-
- createZone(ZONE_NAME, null, IMMEDIATE_TIMER_VALUE, null);
-
- int zoneId = getZoneId(ZONE_NAME);
-
- assertDataNodesFromLogicalNodesInStorage(zoneId, clusterNodes2,
keyValueStorage);
-
- DataNodesHistoryContext context =
dataNodeHistoryContext(metaStorageManager, zoneId);
- assertTrue(context.scaleUpTimerPresent());
- assertTrue(context.scaleDownTimerPresent());
-
- dropZone(ZONE_NAME);
-
- // Data nodes history should not be dropped after zone drop. Deferred
removal should happen on LWM move.
- assertDataNodesFromLogicalNodesInStorage(zoneId, clusterNodes2,
keyValueStorage);
- context = dataNodeHistoryContext(metaStorageManager, zoneId);
- assertFalse(context.scaleUpTimerPresent());
- assertFalse(context.scaleDownTimerPresent());
- }
-
@Test
void testEmptyDataNodesOnStart() throws Exception {
startDistributionZoneManager();
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/ChangeIndexStatusTaskController.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/ChangeIndexStatusTaskController.java
index 5ff838153ff..204bb01fdca 100644
---
a/modules/index/src/main/java/org/apache/ignite/internal/index/ChangeIndexStatusTaskController.java
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/ChangeIndexStatusTaskController.java
@@ -49,8 +49,8 @@ import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
import
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
import org.apache.ignite.internal.replicator.ZonePartitionId;
-import org.apache.ignite.internal.table.LongPriorityQueue;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.LongPriorityQueue;
/**
* Component that reacts to certain Catalog changes and starts or stops
corresponding {@link ChangeIndexStatusTask}s via the
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index fc813940d31..cef5ae464b6 100644
---
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -59,11 +59,11 @@ import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.index.IndexStorage;
-import org.apache.ignite.internal.table.LongPriorityQueue;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.table.distributed.PartitionSet;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.LongPriorityQueue;
/**
* An Ignite component that is responsible for handling index-related commands
like CREATE or DROP as well as managing indexes' lifecycle.
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index 8316ee50346..c4b66184794 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -29,6 +29,7 @@ import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.catalog.events.CatalogEvent.ZONE_CREATE;
+import static org.apache.ignite.internal.catalog.events.CatalogEvent.ZONE_DROP;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.subtract;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.union;
import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.ASSIGNMENTS_SWITCH_REDUCE_PREFIX_BYTES;
@@ -36,13 +37,18 @@ import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalan
import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.STABLE_ASSIGNMENTS_PREFIX_BYTES;
import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.assignmentsChainKey;
import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.extractZonePartitionId;
+import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.pendingChangeTriggerKey;
import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.pendingPartAssignmentsQueueKey;
+import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.plannedPartAssignmentsKey;
import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.stablePartAssignmentsKey;
+import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.switchAppendKey;
+import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.switchReduceKey;
import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.zoneAssignmentsChainGetLocally;
import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.zoneAssignmentsGetLocally;
import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.zonePartitionAssignmentsGetLocally;
import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.zonePendingAssignmentsGetLocally;
import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.zoneStableAssignmentsGetLocally;
+import static org.apache.ignite.internal.event.EventListener.fromConsumer;
import static
org.apache.ignite.internal.hlc.HybridTimestamp.LOGICAL_TIME_BITS_SIZE;
import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
import static
org.apache.ignite.internal.hlc.HybridTimestamp.nullableHybridTimestamp;
@@ -64,6 +70,7 @@ import static
org.apache.ignite.internal.tostring.IgniteToStringBuilder.COLLECTI
import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
@@ -97,6 +104,7 @@ import
org.apache.ignite.internal.catalog.descriptors.CatalogStorageProfileDescr
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.descriptors.ConsistencyMode;
import org.apache.ignite.internal.catalog.events.CreateZoneEventParameters;
+import org.apache.ignite.internal.catalog.events.DropZoneEventParameters;
import org.apache.ignite.internal.components.NodeProperties;
import org.apache.ignite.internal.configuration.SystemDistributedConfiguration;
import
org.apache.ignite.internal.configuration.utils.SystemDistributedConfigurationPropertyHolder;
@@ -118,6 +126,8 @@ import
org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.lowwatermark.LowWatermark;
+import
org.apache.ignite.internal.lowwatermark.event.ChangeLowWatermarkEventParameters;
+import org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.Entry;
@@ -173,10 +183,12 @@ import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.impl.TransactionStateResolver;
import org.apache.ignite.internal.tx.impl.TxMessageSender;
import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
+import
org.apache.ignite.internal.tx.storage.state.TxStateStorageRebalanceException;
import
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.LongPriorityQueue;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.internal.util.TrackerClosedException;
import org.jetbrains.annotations.Nullable;
@@ -280,12 +292,18 @@ public class PartitionReplicaLifecycleManager extends
private final EventListener<CreateZoneEventParameters>
onCreateZoneListener = this::onCreateZone;
private final EventListener<PrimaryReplicaEventParameters>
onPrimaryReplicaExpiredListener = this::onPrimaryReplicaExpired;
+ private final EventListener<DropZoneEventParameters> onZoneDropListener =
fromConsumer(this::onZoneDrop);
+
+ private final EventListener<ChangeLowWatermarkEventParameters>
onLowWatermarkChangedListener = this::onLwmChanged;
/**
* This future completes on {@link #beforeNodeStop()} with {@link
NodeStoppingException} before the {@link #busyLock} is blocked.
*/
private final CompletableFuture<Void> stopReplicaLifecycleFuture = new
CompletableFuture<>();
+ private final LongPriorityQueue<DestroyZoneEvent> destructionEventsQueue =
+ new LongPriorityQueue<>(DestroyZoneEvent::catalogVersion);
+
/**
* The constructor.
*
@@ -457,7 +475,7 @@ public class PartitionReplicaLifecycleManager extends
assert recoveryFinishFuture.isDone();
long recoveryRevision = recoveryFinishFuture.join().revision();
- cleanUpResourcesForDroppedZonesOnRecovery();
+ handleResourcesForDroppedZonesOnRecovery();
HybridTimestamp safeLwm = catalogSafeLowWatermark(lowWatermark,
catalogService);
CompletableFuture<Void> processZonesAndAssignmentsOnStart =
processZonesOnStart(recoveryRevision, safeLwm)
@@ -468,6 +486,9 @@ public class PartitionReplicaLifecycleManager extends
metaStorageMgr.registerPrefixWatch(new
ByteArray(ASSIGNMENTS_SWITCH_REDUCE_PREFIX_BYTES),
assignmentsSwitchRebalanceListener);
catalogService.listen(ZONE_CREATE, onCreateZoneListener);
+ catalogService.listen(ZONE_DROP, onZoneDropListener);
+
+ lowWatermark.listen(LowWatermarkEvent.LOW_WATERMARK_CHANGED,
onLowWatermarkChangedListener);
rebalanceRetryDelayConfiguration.init();
@@ -495,12 +516,34 @@ public class PartitionReplicaLifecycleManager extends
var startedZones = new IntOpenHashSet();
var startZoneFutures = new ArrayList<CompletableFuture<?>>();
+ Catalog nextCatalog = null;
+
for (int ver = latestCatalogVersion; ver >= earliestCatalogVersion;
ver--) {
+ Catalog catalog = catalogService.catalog(ver);
+ Catalog finalNextCatalog = nextCatalog;
+
int ver0 = ver;
catalogService.catalog(ver).zones().stream()
.filter(zone -> startedZones.add(zone.id()))
- .forEach(zoneDescriptor -> startZoneFutures.add(
-
calculateZoneAssignmentsAndCreateReplicationNodes(recoveryRevision, ver0,
zoneDescriptor, true)));
+ .forEach(zoneDescriptor -> {
+ int zoneId = zoneDescriptor.id();
+
+ startZoneFutures.add(
+
calculateZoneAssignmentsAndCreateReplicationNodes(recoveryRevision, ver0,
zoneDescriptor, true));
+
+ // Handle missed zone drop event.
+ if (finalNextCatalog != null &&
finalNextCatalog.zone(zoneId) == null) {
+ destructionEventsQueue.enqueue(
+ new DestroyZoneEvent(
+ finalNextCatalog.version(),
+ zoneId,
+ zoneDescriptor.partitions()
+ )
+ );
+ }
+ });
+
+ nextCatalog = catalog;
}
return allOf(startZoneFutures.toArray(CompletableFuture[]::new))
@@ -570,8 +613,8 @@ public class PartitionReplicaLifecycleManager extends
}
}
- private void cleanUpResourcesForDroppedZonesOnRecovery() {
- // TODO: IGNITE-20384 Clean up abandoned resources for dropped zones
from vault and metastore
+ private void handleResourcesForDroppedZonesOnRecovery() {
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-27466 Handle
abandoned resources for dropped zones from vault and metastore
}
private CompletableFuture<Boolean> onCreateZone(CreateZoneEventParameters
createZoneEventParameters) {
@@ -927,7 +970,10 @@ public class PartitionReplicaLifecycleManager extends
executorInclinedPlacementDriver.removeListener(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED,
onPrimaryReplicaExpiredListener);
+ lowWatermark.removeListener(LowWatermarkEvent.LOW_WATERMARK_CHANGED,
onLowWatermarkChangedListener);
+
catalogService.removeListener(ZONE_CREATE, onCreateZoneListener);
+ catalogService.removeListener(ZONE_DROP, onZoneDropListener);
metaStorageMgr.unregisterWatch(pendingAssignmentsRebalanceListener);
metaStorageMgr.unregisterWatch(stableAssignmentsRebalanceListener);
@@ -1669,7 +1715,7 @@ public class PartitionReplicaLifecycleManager extends
Assignments pendingAssignments,
long revision
) {
- Entry reduceEntry =
metaStorageMgr.getLocally(ZoneRebalanceUtil.switchReduceKey(replicaGrpId),
revision);
+ Entry reduceEntry =
metaStorageMgr.getLocally(switchReduceKey(replicaGrpId), revision);
Assignments reduceAssignments = reduceEntry != null
? Assignments.fromBytes(reduceEntry.value())
@@ -2045,6 +2091,174 @@ public class PartitionReplicaLifecycleManager extends
return zoneResourcesManager.getZonePartitionResources(zonePartitionId);
}
+ private void onZoneDrop(DropZoneEventParameters parameters) {
+ inBusyLock(busyLock, () -> {
+ int eventCatalogVersion = parameters.catalogVersion();
+ int catalogVersionWithZonePresent = eventCatalogVersion - 1;
+ int zoneId = parameters.zoneId();
+
+ CatalogZoneDescriptor zoneDescriptor =
catalogService.catalog(catalogVersionWithZonePresent).zone(zoneId);
+
+ assert zoneDescriptor != null : "Unexpected null zone descriptor
for zoneId=" + zoneId + ", catalogVersion "
+ + catalogVersionWithZonePresent;
+
+ destructionEventsQueue.enqueue(
+ new DestroyZoneEvent(
+ eventCatalogVersion,
+ zoneId,
+ zoneDescriptor.partitions()
+ )
+ );
+ });
+ }
+
+ // TODO https://issues.apache.org/jira/browse/IGNITE-27468 Not
"thread-safe" in case of concurrent disaster recovery or rebalances.
+ private CompletableFuture<Boolean>
onLwmChanged(ChangeLowWatermarkEventParameters parameters) {
+ if (!busyLock.enterBusy()) {
+ return falseCompletedFuture();
+ }
+
+ try {
+ int newEarliestCatalogVersion =
catalogService.activeCatalogVersion(parameters.newLowWatermark().longValue());
+
+ // Run zone destruction fully asynchronously.
+ destructionEventsQueue.drainUpTo(newEarliestCatalogVersion)
+ .forEach(this::removeZonePartitionsIfPossible);
+
+ return falseCompletedFuture();
+ } catch (Throwable t) {
+ return failedFuture(t);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * For each partition performs following actions.
+ *
+ * <ol>
+ * <li>Check whether it's started or await if it's starting.</li>
+ * <li>Check whether the partition is eligible for removal - has zero
table resources and empty txStateStorage.</li>
+ * <li>Stop partition, destroy zone partition resources and unregister
it from within startedReplicationGroups.</li>
+ * <li>Remove partition assignments from meta storage.</li>
+ * </ol>
+ */
+ private void removeZonePartitionsIfPossible(DestroyZoneEvent event) {
+ int zoneId = event.zoneId();
+ int partitionsCount = event.partitions();
+
+ List<CompletableFuture<Boolean>>
partitionsEligibilityForRemovalFutures = new ArrayList<>();
+ for (int partitionIndex = 0; partitionIndex < partitionsCount;
partitionIndex++) {
+ ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId,
partitionIndex);
+
+ CompletableFuture<Boolean> partitionRemovalFuture =
+
startedReplicationGroups.hasReplicationGroupStartedOrAwaitIfStarting(zonePartitionId)
+ .thenComposeAsync(started -> {
+ if (!started) {
+ return falseCompletedFuture();
+ }
+
+ return inBusyLockAsync(busyLock, () ->
+ isEligibleForDrop(zonePartitionId)
+ .thenCompose(eligible -> {
+ if (!eligible) {
+ return
falseCompletedFuture();
+ }
+
+ // It's safe to use -1 as
revision id here, since we only stop partitions that do not
+ // have active
table-related resources.
+ return
stopAndDestroyPartition(zonePartitionId, -1)
+ .thenCompose(v ->
dropAssignments(zonePartitionId))
+ .thenApply(v ->
true);
+ })
+ );
+ }, partitionOperationsExecutor)
+ .exceptionally(e -> {
+ if (!hasCause(e, NodeStoppingException.class))
{
+ LOG.error(
+ "Unable to destroy zone partition
[zonePartitionId={}]",
+ e,
+ zonePartitionId);
+ }
+
+ // In case of false, event will be returned to
the destructionEventsQueue and thus removal will be retried
+ // on next iteration of LWM change.
+ return false;
+ });
+
+ partitionsEligibilityForRemovalFutures.add(partitionRemovalFuture);
+ }
+
+ // If there's a partition that still has non empty resourses e.g.
non-empty txnStateStorage, the event is returned
+ // back to destructionEventsQueue and thus will be re-processed on
next lwm change.
+ allOf(partitionsEligibilityForRemovalFutures.toArray(new
CompletableFuture[0]))
+ .thenApply(fs ->
partitionsEligibilityForRemovalFutures.stream().anyMatch(f -> !f.join()))
+ .thenAccept(anyFalse -> {
+ if (anyFalse) {
+ destructionEventsQueue.enqueue(event);
+ } else {
+ distributionZoneMgr.onDropZoneDestroy(zoneId,
event.catalogVersion)
+ .whenComplete((r, e) -> {
+ if (e != null) {
+ LOG.error(
+ "Unable to destroy zone
resources [zoneId={}]",
+ e,
+ zoneId);
+ }
+ });
+ }
+ });
+ }
+
+ /**
+ * Checks whether partition could be removed. In order to match the
condition, a zone partition should have zero table resources and
+ * empty txStateStorage.
+ */
+ private CompletableFuture<Boolean> isEligibleForDrop(ZonePartitionId
zonePartitionId) {
+ ZonePartitionResources zonePartitionResources =
zoneResourcesManager.getZonePartitionResources(zonePartitionId);
+
+ if (zonePartitionResources == null) {
+ return trueCompletedFuture();
+ }
+
+ try (var cursor =
zonePartitionResources.txStatePartitionStorage().scan()) {
+ if (cursor.hasNext()) {
+ return falseCompletedFuture();
+ }
+ } catch (TxStateStorageRebalanceException e) {
+ return falseCompletedFuture();
+ }
+
+ // In order to simplify the logic of table resources cleanup, that is
handled by TableManager and
+ // zone resources cleanup that is handled by
PartitionReplicaLifecycleManager, on zone destruction event
+ // we await TableManager to finish its own job instead of stealing it.
+ // In case when both tables and zone were destroyed on the same lwm,
PartitionReplicaLifecycleManager will cleanup
+ // zone resources within the next lwm update.
+ return zoneResourcesManager.areTableResourcesEmpty(zonePartitionId);
+ }
+
+ /**
+ * Removes all zone partition assignment keys from metastorage.
+ */
+ private CompletableFuture<Void> dropAssignments(ZonePartitionId
zonePartitionId) {
+ Set<ByteArray> assignmentKeys = Set.of(
+ stablePartAssignmentsKey(zonePartitionId),
+ pendingPartAssignmentsQueueKey(zonePartitionId),
+ pendingChangeTriggerKey(zonePartitionId),
+ plannedPartAssignmentsKey(zonePartitionId),
+ switchAppendKey(zonePartitionId),
+ switchReduceKey(zonePartitionId),
+ assignmentsChainKey(zonePartitionId)
+ );
+
+ return metaStorageMgr.removeAll(assignmentKeys)
+ .whenComplete((v, e) -> {
+ if (e != null) {
+ LOG.error("Failed to remove assignments from
metastorage [zonePartitionId={}]", e, zonePartitionId);
+ }
+ });
+ }
+
/**
* For HA zones: Check that last rebalance was graceful (caused by common
rebalance triggers, like data nodes change, replica factor
* change, etc.) rather than forced (caused by a disaster recovery reset
after losing the majority of nodes).
@@ -2054,6 +2268,35 @@ public class PartitionReplicaLifecycleManager extends
return assignmentsChain == null || assignmentsChain.size() == 1;
}
+ /** Internal event. */
+ private static class DestroyZoneEvent {
+ final int catalogVersion;
+ final int zoneId;
+ final int partitions;
+
+ DestroyZoneEvent(
+ int catalogVersion,
+ int zoneId,
+ int partitions
+ ) {
+ this.catalogVersion = catalogVersion;
+ this.zoneId = zoneId;
+ this.partitions = partitions;
+ }
+
+ int catalogVersion() {
+ return catalogVersion;
+ }
+
+ int zoneId() {
+ return zoneId;
+ }
+
+ int partitions() {
+ return partitions;
+ }
+ }
+
/**
* Factory for creating table partition replica processors.
*/
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/StartedReplicationGroups.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/StartedReplicationGroups.java
index cb66641923d..35816ab71cf 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/StartedReplicationGroups.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/StartedReplicationGroups.java
@@ -18,6 +18,8 @@
package org.apache.ignite.internal.partition.replicator;
import static org.apache.ignite.internal.util.CompletableFutures.copyStateTo;
+import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
+import static
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
import java.util.Map;
import java.util.Set;
@@ -100,6 +102,23 @@ class StartedReplicationGroups {
return startedReplicationGroupIds.contains(zonePartitionId);
}
+ /**
+ * Returns trueCompleted future if group was already started or awaits
groups startup if it's in the middle of the start process,
+ * otherwise returns falseCompleted future.
+ */
+ CompletableFuture<Boolean>
hasReplicationGroupStartedOrAwaitIfStarting(ZonePartitionId zonePartitionId) {
+ if (startedReplicationGroupIds.contains(zonePartitionId)) {
+ return trueCompletedFuture();
+ }
+
+ CompletableFuture<Void> groupStartingFuture =
startingReplicationGroupIds.get(zonePartitionId);
+ if (groupStartingFuture != null) {
+ return groupStartingFuture.thenCompose(ignored ->
trueCompletedFuture());
+ }
+
+ return falseCompletedFuture();
+ }
+
private void completeStartingFuture(ZonePartitionId zonePartitionId) {
CompletableFuture<Void> startingFuture =
startingReplicationGroupIds.remove(zonePartitionId);
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
index 61f8e95e3cf..860204fea6e 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
@@ -330,6 +330,13 @@ public class ZonePartitionReplicaListener implements
ReplicaListener {
return processor == null ? null : processor.txRwOperationTracker();
}
+ /**
+ * Returns true if there are no table replica processors, false otherwise.
+ */
+ boolean areTableReplicaProcessorsEmpty() {
+ return replicaProcessors.isEmpty();
+ }
+
/**
* Return table replicas listeners.
*
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
index c32da64d051..552984b8784 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.partition.replicator;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import java.util.Map;
@@ -211,6 +212,24 @@ public class ZoneResourcesManager implements
ManuallyCloseable {
});
}
+ /**
+ * Returns future of true if there are no corresponding table-related
resources, otherwise awaits replicaListenerFuture
+ * and checks whether table replica processors, table raft processors and
partition snapshot storages are present.
+ * if any is present, returns false, otherwise returns true.
+ */
+ CompletableFuture<Boolean> areTableResourcesEmpty(ZonePartitionId
zonePartitionId) {
+ ZonePartitionResources resources =
getZonePartitionResources(zonePartitionId);
+
+ if (resources == null) {
+ return trueCompletedFuture();
+ }
+
+ return resources.replicaListenerFuture
+ .thenApply(zoneReplicaListener ->
zoneReplicaListener.areTableReplicaProcessorsEmpty()
+ &&
resources.raftListener().areTableRaftProcessorsEmpty()
+ &&
resources.snapshotStorage().arePartitionSnapshotStoragesEmpty());
+ }
+
@TestOnly
@Nullable
TxStatePartitionStorage txStatePartitionStorage(int zoneId, int
partitionId) {
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
index e9117c93503..3a2114ae7ee 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
@@ -491,6 +491,15 @@ public class ZonePartitionRaftListener implements
RaftGroupListener {
}
}
+ /**
+ * Returns true if there are no table processors, false otherwise.
+ */
+ public boolean areTableRaftProcessorsEmpty() {
+ synchronized (tableProcessorsStateLock) {
+ return tableProcessors.isEmpty();
+ }
+ }
+
private void cleanupSnapshots() {
partitionsSnapshots.cleanupOutgoingSnapshots(partitionKey);
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
index 705c598a1ef..c98b3315643 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
@@ -205,6 +205,15 @@ public class PartitionSnapshotStorage {
}
}
+ /**
+ * Returns true if there are neither ongoing snapshot operations nor
partition snapshot storages, falser otherwise.
+ */
+ public boolean arePartitionSnapshotStoragesEmpty() {
+ synchronized (snapshotOperationLock) {
+ return ongoingSnapshotOperations.isEmpty() &&
partitionsByTableId.isEmpty();
+ }
+ }
+
/**
* Returns the TX state storage.
*/
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java
index 5c92b69c99c..f16bc19ca4f 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java
@@ -55,9 +55,9 @@ import
org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent;
import
org.apache.ignite.internal.sql.engine.statistic.event.StatisticChangedEvent;
import
org.apache.ignite.internal.sql.engine.statistic.event.StatisticEventParameters;
import org.apache.ignite.internal.table.InternalTable;
-import org.apache.ignite.internal.table.LongPriorityQueue;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.util.LongPriorityQueue;
import org.jetbrains.annotations.TestOnly;
/**
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/partition/ItPartitionDestructionTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/partition/ItPartitionDestructionTest.java
index b448cc4f164..a63274710d0 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/partition/ItPartitionDestructionTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/partition/ItPartitionDestructionTest.java
@@ -23,8 +23,18 @@ import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
-import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesHistoryKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownTimerKey;
+import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpTimerKey;
+import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.assignmentsChainKey;
+import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.pendingChangeTriggerKey;
+import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.pendingPartAssignmentsQueueKey;
+import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.plannedPartAssignmentsKey;
+import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.stablePartAssignmentsKey;
+import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.switchAppendKey;
+import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.switchReduceKey;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasSize;
@@ -32,6 +42,7 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.io.FileMatchers.anExistingFile;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
import java.io.File;
import java.nio.ByteBuffer;
@@ -53,17 +64,19 @@ import
org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.configuration.IgnitePaths;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lowwatermark.LowWatermarkImpl;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.partitiondistribution.Assignment;
import org.apache.ignite.internal.partitiondistribution.TokenizedAssignments;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.RaftNodeId;
import org.apache.ignite.internal.replicator.PartitionGroupId;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.sql.engine.util.SqlTestUtils;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.testframework.SystemPropertiesExtension;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
+import org.apache.ignite.internal.tx.metrics.ResourceVacuumMetrics;
import
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbPartitionStorage;
import org.apache.ignite.internal.vault.VaultService;
import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
@@ -137,7 +150,6 @@ class ItPartitionDestructionTest extends
ClusterPerTestIntegrationTest {
}
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-24345")
void partitionIsDestroyedOnZoneDestruction() throws Exception {
cluster.startAndInit(1,
ItPartitionDestructionTest::aggressiveLowWatermarkIncrease);
@@ -151,10 +163,23 @@ class ItPartitionDestructionTest extends
ClusterPerTestIntegrationTest {
makeSurePartitionExistsOnDisk(ignite0, tableId, replicationGroupId);
+ verifyZoneDistributionZoneManagerResourcesArePresent(ignite0,
replicationGroupId.zoneId());
+
executeUpdate("DROP TABLE " + TABLE_NAME);
executeUpdate("DROP ZONE " + ZONE_NAME);
+ verifyPartitionMvDataGetsRemovedFromDisk(ignite0, tableId,
replicationGroupId);
+
+ verifyPartitionNonMvDataExistsOnDisk(ignite0, replicationGroupId);
+
+ // Trigger txStateStorage vacuumization that will remove all records
from the storage and thus make it eligible for removal.
+
assertThat(ignite0.txManager().vacuum(mock(ResourceVacuumMetrics.class)),
willCompleteSuccessfully());
+
verifyPartitionGetsFullyRemovedFromDisk(ignite0, tableId,
replicationGroupId);
+
+ verifyAssignmentKeysWereRemovedFromMetaStorage(ignite0,
replicationGroupId);
+
+
verifyZoneDistributionZoneManagerResourcesWereRemovedFromMetaStorage(ignite0,
replicationGroupId.zoneId());
}
/**
@@ -208,8 +233,17 @@ class ItPartitionDestructionTest extends
ClusterPerTestIntegrationTest {
}
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-24345")
- void partitionIsDestroyedOnZoneDestructionOnNodeRecovery() throws
Exception {
+ public void
partitionIsDestroyedOnZoneDestructionOnNodeRecoveryIfLwmIsNotMovedWhileNodeIsAbsent()
throws Exception {
+ verifyPartitionIsDestroyedOnZoneDestructionOnNodeRecovery(false);
+ }
+
+ @Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-27466")
+ public void
partitionIsDestroyedOnZoneDestructionOnNodeRecoveryIfLwmIsMovedWhileNodeIsAbsent()
throws Exception {
+ verifyPartitionIsDestroyedOnZoneDestructionOnNodeRecovery(true);
+ }
+
+ private void
verifyPartitionIsDestroyedOnZoneDestructionOnNodeRecovery(boolean
moveLwmWhileNodeIsAbsent) throws Exception {
cluster.startAndInit(1,
ItPartitionDestructionTest::aggressiveLowWatermarkIncrease);
IgniteImpl ignite0 = unwrapIgniteImpl(cluster.node(0));
Path workDir0 = ((IgniteServerImpl) cluster.server(0)).workDir();
@@ -234,15 +268,28 @@ class ItPartitionDestructionTest extends
ClusterPerTestIntegrationTest {
executeUpdate("DROP ZONE " + ZONE_NAME);
HybridTimestamp tsAfterDrop = latestCatalogVersionTs(ignite0);
+ verifyPartitionNonMvDataExistsOnDisk(ignite0, replicationGroupId);
+
+ verifyZoneDistributionZoneManagerResourcesArePresent(ignite0,
replicationGroupId.zoneId());
+
+ // Trigger txStateStorage vacuumization that will remove all records
from the storage and thus make it eligible for removal.
+
assertThat(ignite0.txManager().vacuum(mock(ResourceVacuumMetrics.class)),
willCompleteSuccessfully());
+
cluster.stopNode(0);
- // Simulate a situation when an LWM was raised (and persisted) and the
node was stopped immediately, so we were not
- // able to destroy the dropped zone yet, even though its drop moment
is already under the LWM.
- raisePersistedLwm(workDir0, tsAfterDrop.tick());
+ if (moveLwmWhileNodeIsAbsent) {
+ // Simulate a situation when an LWM was raised (and persisted) and
the node was stopped immediately, so we were not
+ // able to destroy the dropped zone yet, even though its drop
moment is already under the LWM.
+ raisePersistedLwm(workDir0, tsAfterDrop.tick());
+ }
IgniteImpl restartedIgnite0 = unwrapIgniteImpl(cluster.startNode(0));
verifyPartitionGetsFullyRemovedFromDisk(restartedIgnite0, tableId,
replicationGroupId);
+
+ verifyAssignmentKeysWereRemovedFromMetaStorage(restartedIgnite0,
replicationGroupId);
+
+
verifyZoneDistributionZoneManagerResourcesWereRemovedFromMetaStorage(restartedIgnite0,
replicationGroupId.zoneId());
}
/**
@@ -371,7 +418,7 @@ class ItPartitionDestructionTest extends
ClusterPerTestIntegrationTest {
return table.zoneId();
}
- private static void makeSurePartitionExistsOnDisk(IgniteImpl ignite, int
tableId, PartitionGroupId replicationGroupId) {
+ private static void makeSurePartitionExistsOnDisk(IgniteImpl ignite, int
tableId, ZonePartitionId replicationGroupId) {
makeSurePartitionMvDataExistsOnDisk(ignite, tableId);
assertTrue(hasSomethingInTxStateStorage(ignite, replicationGroupId));
@@ -428,20 +475,20 @@ class ItPartitionDestructionTest extends
ClusterPerTestIntegrationTest {
.array();
}
- private static File partitionRaftMetaFile(IgniteImpl ignite,
ReplicationGroupId replicationGroupId) {
+ private static File partitionRaftMetaFile(IgniteImpl ignite,
ZonePartitionId replicationGroupId) {
String relativePath = partitionRaftNodeIdStringForStorage(ignite,
replicationGroupId) + "/meta/raft_meta";
Path raftMetaFilePath =
ignite.partitionsWorkDir().metaPath().resolve(relativePath);
return raftMetaFilePath.toFile();
}
- private static LogStorage partitionLogStorage(IgniteImpl ignite,
ReplicationGroupId replicationGroupId) {
+ private static LogStorage partitionLogStorage(IgniteImpl ignite,
ZonePartitionId replicationGroupId) {
return ignite.partitionsLogStorageFactory().createLogStorage(
partitionRaftNodeIdStringForStorage(ignite,
replicationGroupId),
new RaftOptions()
);
}
- private static String partitionRaftNodeIdStringForStorage(IgniteImpl
ignite, ReplicationGroupId replicationGroupId) {
+ private static String partitionRaftNodeIdStringForStorage(IgniteImpl
ignite, ZonePartitionId replicationGroupId) {
RaftNodeId partitionRaftNodeId = new RaftNodeId(replicationGroupId,
new Peer(ignite.name()));
return partitionRaftNodeId.nodeIdStringForStorage();
}
@@ -453,7 +500,7 @@ class ItPartitionDestructionTest extends
ClusterPerTestIntegrationTest {
private static void verifyPartitionGetsFullyRemovedFromDisk(
IgniteImpl ignite,
int tableId,
- PartitionGroupId replicationGroupId
+ ZonePartitionId replicationGroupId
) throws InterruptedException {
verifyPartitionGetsRemovedFromDisk(ignite, tableId,
replicationGroupId, true);
}
@@ -461,7 +508,7 @@ class ItPartitionDestructionTest extends
ClusterPerTestIntegrationTest {
private static void verifyPartitionMvDataGetsRemovedFromDisk(
IgniteImpl ignite,
int tableId,
- PartitionGroupId replicationGroupId
+ ZonePartitionId replicationGroupId
) throws InterruptedException {
verifyPartitionGetsRemovedFromDisk(ignite, tableId,
replicationGroupId, false);
}
@@ -469,39 +516,63 @@ class ItPartitionDestructionTest extends
ClusterPerTestIntegrationTest {
private static void verifyPartitionGetsRemovedFromDisk(
IgniteImpl ignite,
int tableId,
- PartitionGroupId replicationGroupId,
+ ZonePartitionId replicationGroupId,
boolean concernNonMvData
) throws InterruptedException {
File partitionFile = testTablePartition0File(ignite, tableId);
- assertTrue(
- waitForCondition(() -> !partitionFile.exists(),
SECONDS.toMillis(10)),
- "Partition file " + partitionFile.getAbsolutePath() + " was
not removed in time"
- );
+ await().atMost(10, SECONDS)
+ .untilAsserted(() -> {
+ assertThat(
+ "Partition file " +
partitionFile.getAbsolutePath() + " was not removed in time.",
+ !partitionFile.exists()
+ );
+ });
if (concernNonMvData) {
- assertTrue(
- waitForCondition(() ->
!hasSomethingInTxStateStorage(ignite, replicationGroupId),
SECONDS.toMillis(10)),
- "Tx state storage was not destroyed in time"
- );
-
- assertTrue(
- waitForCondition(() -> partitionLogStorage(ignite,
replicationGroupId).getLastLogIndex() == 0L, SECONDS.toMillis(10)),
- "Partition Raft log was not removed in time"
- );
+ await().atMost(10, SECONDS)
+ .untilAsserted(() -> {
+ assertThat(
+ "Tx state storage was not destroyed in time.",
+ !hasSomethingInTxStateStorage(ignite,
replicationGroupId)
+ );
+ });
+
+ await().atMost(10, SECONDS)
+ .untilAsserted(() -> {
+ assertThat(
+ "Partition Raft log was not removed in time.",
+ partitionLogStorage(ignite,
replicationGroupId).getLastLogIndex(), is(0L)
+ );
+ });
File raftMetaFile = partitionRaftMetaFile(ignite,
replicationGroupId);
- assertTrue(
- waitForCondition(() -> !raftMetaFile.exists(),
SECONDS.toMillis(10)),
- "Partition Raft meta file " +
raftMetaFile.getAbsolutePath() + " was not removed in time"
- );
+ await().atMost(10, SECONDS)
+ .untilAsserted(() -> {
+ assertThat(
+ "Partition Raft meta file " +
raftMetaFile.getAbsolutePath() + " was not removed in time.",
+ !raftMetaFile.exists()
+ );
+ });
}
}
+ private static void verifyPartitionNonMvDataExistsOnDisk(IgniteImpl
ignite, ZonePartitionId replicationGroupId) {
+ assertTrue(hasSomethingInTxStateStorage(ignite, replicationGroupId),
"Tx state storage was unexpectedly destroyed");
+
+ assertTrue(partitionLogStorage(ignite,
replicationGroupId).getLastLogIndex() > 0L, "Partition Raft log was
unexpectedly removed.");
+
+ File raftMetaFile = partitionRaftMetaFile(ignite, replicationGroupId);
+ assertTrue(raftMetaFile.exists(), "Partition Raft meta file " +
raftMetaFile.getAbsolutePath() + " was unexpectedly removed.");
+ }
+
private static void waitTillCatalogDoesNotContainTargetTable(IgniteImpl
ignite, String tableName) throws InterruptedException {
- assertTrue(
- waitForCondition(() ->
!catalogHistoryContainsTargetTable(ignite, tableName), SECONDS.toMillis(10)),
- "Did not observe catalog truncation in time"
- );
+ await().atMost(10, SECONDS)
+ .untilAsserted(() -> {
+ assertThat(
+ "Did not observe catalog truncation in time.",
+ !catalogHistoryContainsTargetTable(ignite,
tableName)
+ );
+ });
}
private static boolean catalogHistoryContainsTargetTable(IgniteImpl
ignite, String tableName) {
@@ -550,15 +621,18 @@ class ItPartitionDestructionTest extends
ClusterPerTestIntegrationTest {
verifyPartitionGetsFullyRemovedFromDisk(notHostingIgnite, tableId,
replicationGroupId);
}
- private void waitTillAssignmentCountReaches(int targetAssignmentCount,
ReplicationGroupId replicationGroupId)
+ private void waitTillAssignmentCountReaches(int targetAssignmentCount,
ZonePartitionId replicationGroupId)
throws InterruptedException {
- assertTrue(
- waitForCondition(() ->
partitionAssignments(replicationGroupId).size() == targetAssignmentCount,
SECONDS.toMillis(10)),
- "Did not see assignments count reaching " +
targetAssignmentCount
- );
+ await().atMost(10, SECONDS)
+ .untilAsserted(() -> {
+ assertThat(
+ "Did not see assignments count reaching.",
+ partitionAssignments(replicationGroupId).size() ==
targetAssignmentCount
+ );
+ });
}
- private Set<Assignment> partitionAssignments(ReplicationGroupId
replicationGroupId) {
+ private Set<Assignment> partitionAssignments(ZonePartitionId
replicationGroupId) {
IgniteImpl ignite = unwrapIgniteImpl(cluster.aliveNode());
CompletableFuture<TokenizedAssignments> assignmentsFuture =
ignite.placementDriver()
@@ -571,7 +645,7 @@ class ItPartitionDestructionTest extends
ClusterPerTestIntegrationTest {
return assignments.nodes();
}
- private IgniteImpl nodeNotHostingPartition(ReplicationGroupId
replicationGroupId) throws InterruptedException {
+ private IgniteImpl nodeNotHostingPartition(ZonePartitionId
replicationGroupId) throws InterruptedException {
waitTillAssignmentCountReaches(1, replicationGroupId);
Set<Assignment> assignments = partitionAssignments(replicationGroupId);
@@ -584,4 +658,129 @@ class ItPartitionDestructionTest extends
ClusterPerTestIntegrationTest {
.map(TestWrappers::unwrapIgniteImpl)
.orElseThrow();
}
+
+ private static void
verifyAssignmentKeysWereRemovedFromMetaStorage(IgniteImpl ignite,
ZonePartitionId zonePartitionId)
+ throws InterruptedException {
+ MetaStorageManager metaStorage =
unwrapIgniteImpl(ignite).metaStorageManager();
+
+ await().atMost(10, SECONDS)
+ .untilAsserted(() -> {
+ Entry entry =
metaStorage.getLocally(stablePartAssignmentsKey(zonePartitionId));
+ assertThat(
+ "Stable assignments were not removed from meta
storage in time.",
+ entry.tombstone() || entry.empty()
+ );
+ });
+
+ await().atMost(10, SECONDS)
+ .untilAsserted(() -> {
+ Entry entry =
metaStorage.getLocally(pendingPartAssignmentsQueueKey(zonePartitionId));
+ assertThat(
+ "Pending assignments were not removed from meta
storage in time.",
+ entry.tombstone() || entry.empty()
+ );
+ });
+
+ await().atMost(10, SECONDS)
+ .untilAsserted(() -> {
+ Entry entry =
metaStorage.getLocally(pendingChangeTriggerKey(zonePartitionId));
+ assertThat(
+ "Pending change trigger key was not removed from
meta storage in time.",
+ entry.tombstone() || entry.empty()
+ );
+ });
+
+ await().atMost(10, SECONDS)
+ .untilAsserted(() -> {
+ Entry entry =
metaStorage.getLocally(plannedPartAssignmentsKey(zonePartitionId));
+ assertThat(
+ "Planned assignments were not removed from meta
storage in time.",
+ entry.tombstone() || entry.empty()
+ );
+ });
+
+ await().atMost(10, SECONDS)
+ .untilAsserted(() -> {
+ Entry entry =
metaStorage.getLocally(switchAppendKey(zonePartitionId));
+ assertThat(
+ "Switch append assignments were not removed from
meta storage in time.",
+ entry.tombstone() || entry.empty()
+ );
+ });
+
+ await().atMost(10, SECONDS)
+ .untilAsserted(() -> {
+ Entry entry =
metaStorage.getLocally(switchReduceKey(zonePartitionId));
+ assertThat(
+ "Switch reduce assignments were not removed from
meta storage in time.",
+ entry.tombstone() || entry.empty()
+ );
+ });
+
+ await().atMost(10, SECONDS)
+ .untilAsserted(() -> {
+ Entry entry =
metaStorage.getLocally(assignmentsChainKey(zonePartitionId));
+ assertThat(
+ "Assignments chain was not removed from meta
storage in time.",
+ entry.tombstone() || entry.empty()
+ );
+ });
+ }
+
+ private static void
verifyZoneDistributionZoneManagerResourcesArePresent(IgniteImpl ignite, int
zoneId) throws InterruptedException {
+ MetaStorageManager metaStorage =
unwrapIgniteImpl(ignite).metaStorageManager();
+
+ await().atMost(10, SECONDS)
+ .untilAsserted(() -> {
+ assertThat(
+ "Zone data nodes are not present in meta storage.",
+
!metaStorage.getLocally(zoneDataNodesHistoryKey(zoneId)).empty()
+ );
+ });
+
+ await().atMost(10, SECONDS)
+ .untilAsserted(() -> {
+ assertThat(
+ "Zone scale up timer is not present in meta
storage.",
+
!metaStorage.getLocally(zoneScaleUpTimerKey(zoneId)).empty()
+ );
+ });
+
+ await().atMost(10, SECONDS)
+ .untilAsserted(() -> {
+ assertThat(
+ "Zone scale down timer is not present in meta
storage.",
+
!metaStorage.getLocally(zoneScaleDownTimerKey(zoneId)).empty()
+ );
+ });
+ }
+
+ private static void
verifyZoneDistributionZoneManagerResourcesWereRemovedFromMetaStorage(IgniteImpl
ignite, int zoneId)
+ throws InterruptedException {
+ MetaStorageManager metaStorage =
unwrapIgniteImpl(ignite).metaStorageManager();
+
+ await().atMost(10, SECONDS)
+ .untilAsserted(() -> {
+ assertThat(
+ "Zone data nodes were not removed from meta
storage in time.",
+
metaStorage.getLocally(zoneDataNodesHistoryKey(zoneId)).tombstone()
+ );
+ });
+
+ await().atMost(10, SECONDS)
+ .untilAsserted(() -> {
+ assertThat(
+ "Zone scale up timer was not removed from meta
storage in time.",
+
metaStorage.getLocally(zoneScaleUpTimerKey(zoneId)).tombstone()
+ );
+ });
+
+ await().atMost(10, SECONDS)
+ .untilAsserted(() -> {
+ assertThat(
+ "Zone scale down timer was not removed from meta
storage in time.",
+
metaStorage.getLocally(zoneScaleDownTimerKey(zoneId)).tombstone()
+ );
+ });
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 6cb27eb4729..2743476c67e 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -28,9 +28,7 @@ import static
java.util.concurrent.CompletableFuture.supplyAsync;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.toList;
-import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.causality.IncrementalVersionedValue.dependingOn;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
import static org.apache.ignite.internal.event.EventListener.fromConsumer;
import static
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEvent.AFTER_REPLICA_DESTROYED;
import static
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEvent.AFTER_REPLICA_STOPPED;
@@ -77,7 +75,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
-import java.util.stream.IntStream;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogService;
import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
@@ -105,7 +102,6 @@ import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.hlc.HybridTimestampTracker;
-import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -158,7 +154,6 @@ import
org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
import
org.apache.ignite.internal.storage.metrics.StorageEngineTablesMetricSource;
import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.InternalTable;
-import org.apache.ignite.internal.table.LongPriorityQueue;
import org.apache.ignite.internal.table.StreamerReceiverRunner;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.TableViewInternal;
@@ -190,6 +185,7 @@ import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.Lazy;
+import org.apache.ignite.internal.util.LongPriorityQueue;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.sql.IgniteSql;
@@ -978,6 +974,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
}
}
+ // TODO https://issues.apache.org/jira/browse/IGNITE-27468 Not
"thread-safe" in case of concurrent disaster recovery or rebalances.
private CompletableFuture<Boolean>
onLwmChanged(ChangeLowWatermarkEventParameters parameters) {
if (!busyLock.enterBusy()) {
return falseCompletedFuture();
@@ -1255,20 +1252,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
InternalTable internalTable = table.internalTable();
- if (!nodeProperties.colocationEnabled()) {
- Set<ByteArray> assignmentKeys = IntStream.range(0,
internalTable.partitions())
- .mapToObj(p -> stablePartAssignmentsKey(new
TablePartitionId(tableId, p)))
- .collect(toSet());
-
- metaStorageMgr.removeAll(assignmentKeys)
- .whenComplete((v, e) -> {
- if (e != null) {
- LOG.error("Failed to remove assignments from
metastorage [tableId={}]", e, tableId);
- }
- });
- }
-
- return stopAndDestroyTablePartitions(table)
+ return stopAndDestroyTableProcessors(table)
.thenComposeAsync(unused -> inBusyLockAsync(busyLock, () ->
internalTable.storage().destroy()), ioExecutor)
.thenAccept(unused -> inBusyLock(busyLock, () -> {
tables.remove(tableId);
@@ -1534,7 +1518,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
return CompletableFutures.allOf(storageFuts);
}
- private CompletableFuture<Void>
stopAndDestroyTablePartitions(TableViewInternal table) {
+ private CompletableFuture<Void>
stopAndDestroyTableProcessors(TableViewInternal table) {
InternalTable internalTable = table.internalTable();
int partitions = internalTable.partitions();
@@ -1549,20 +1533,13 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
return writeLockAcquisitionFuture.thenCompose(stamp -> {
CompletableFuture<?>[] stopReplicaAndDestroyFutures = new
CompletableFuture<?>[partitions];
- // TODO https://issues.apache.org/jira/browse/IGNITE-24345
- // Partitions should be stopped on the assignments change
event triggered by zone drop or alter.
- // Stop replica asynchronously, out of metastorage event
pipeline.
for (int partitionId = 0; partitionId < partitions;
partitionId++) {
CompletableFuture<Void> resourcesUnloadFuture;
- if (nodeProperties.colocationEnabled()) {
- resourcesUnloadFuture =
partitionReplicaLifecycleManager.unloadTableResourcesFromZoneReplica(
- new ZonePartitionId(internalTable.zoneId(),
partitionId),
- internalTable.tableId()
- );
- } else {
- resourcesUnloadFuture = nullCompletedFuture();
- }
+ resourcesUnloadFuture =
partitionReplicaLifecycleManager.unloadTableResourcesFromZoneReplica(
+ new ZonePartitionId(internalTable.zoneId(),
partitionId),
+ internalTable.tableId()
+ );
var tablePartitionId = new
TablePartitionId(internalTable.tableId(), partitionId);
@@ -1884,16 +1861,11 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
}
private void cleanUpResourcesForDroppedTablesOnRecoveryBusy(@Nullable
HybridTimestamp lwm) {
- // TODO: IGNITE-20384 Clean up abandoned resources for dropped zones
from vault and metastore
+ // TODO: IGNITE-20384 Clean up abandoned resources for dropped tables
from vault and metastore
Set<Integer> aliveTableIds = aliveTables(catalogService, lwm);
destroyMvStoragesForTablesNotIn(aliveTableIds);
-
- if (!nodeProperties.colocationEnabled()) {
- destroyTxStateStoragesForTablesNotIn(aliveTableIds);
- destroyReplicationProtocolStoragesForTablesNotIn(aliveTableIds);
- }
}
private void destroyMvStoragesForTablesNotIn(Set<Integer> aliveTableIds) {