This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 87aafb41db1 IGNITE-24345 Defer zone destruction after drop (#7322)
87aafb41db1 is described below

commit 87aafb41db11ed213e795bbf0be3b976bc2845e2
Author: Alexander Lapin <[email protected]>
AuthorDate: Mon Dec 29 21:21:17 2025 +0200

    IGNITE-24345 Defer zone destruction after drop (#7322)
    
    Co-authored-by: Roman Puchkovskiy <[email protected]>
---
 .../handler/ClientPrimaryReplicaTracker.java       |   4 +-
 .../ignite/internal/util}/LongPriorityQueue.java   |   4 +-
 .../internal/util}/LongPriorityQueueSelfTest.java  |   2 +-
 .../rebalance/ItRebalanceDistributedTest.java      |   2 +-
 .../distributionzones/DataNodesManager.java        |  20 +-
 .../distributionzones/DistributionZoneManager.java |  14 +-
 ...ibutionZoneManagerConfigurationChangesTest.java |  15 --
 ...istributionZoneManagerScaleUpScaleDownTest.java |  66 -----
 .../index/ChangeIndexStatusTaskController.java     |   2 +-
 .../apache/ignite/internal/index/IndexManager.java |   2 +-
 .../PartitionReplicaLifecycleManager.java          | 255 +++++++++++++++++-
 .../replicator/StartedReplicationGroups.java       |  19 ++
 .../replicator/ZonePartitionReplicaListener.java   |   7 +
 .../partition/replicator/ZoneResourcesManager.java |  19 ++
 .../replicator/raft/ZonePartitionRaftListener.java |   9 +
 .../raft/snapshot/PartitionSnapshotStorage.java    |   9 +
 .../engine/statistic/SqlStatisticManagerImpl.java  |   2 +-
 .../partition/ItPartitionDestructionTest.java      | 285 +++++++++++++++++----
 .../internal/table/distributed/TableManager.java   |  46 +---
 19 files changed, 590 insertions(+), 192 deletions(-)

diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java
index f667a4cad70..e0cc976a9d8 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientPrimaryReplicaTracker.java
@@ -52,9 +52,9 @@ import 
org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
 import org.apache.ignite.internal.schema.SchemaSyncService;
-import org.apache.ignite.internal.table.LongPriorityQueue;
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.LongPriorityQueue;
 import org.apache.ignite.lang.TableNotFoundException;
 import org.jetbrains.annotations.Nullable;
 
@@ -341,7 +341,7 @@ public class ClientPrimaryReplicaTracker {
 
     private void onLwmChanged(ChangeLowWatermarkEventParameters parameters) {
         inBusyLockSafe(busyLock, () -> {
-            // TODO: https://issues.apache.org/jira/browse/IGNITE-24345 - 
support zone destruction.
+            // TODO: https://issues.apache.org/jira/browse/IGNITE-25017 - 
support zone destruction.
             int earliestVersion = 
catalogService.activeCatalogVersion(parameters.newLowWatermark().longValue());
 
             List<DestroyTableEvent> events = 
destructionEventsQueue.drainUpTo(earliestVersion);
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/LongPriorityQueue.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/LongPriorityQueue.java
similarity index 95%
rename from 
modules/table/src/main/java/org/apache/ignite/internal/table/LongPriorityQueue.java
rename to 
modules/core/src/main/java/org/apache/ignite/internal/util/LongPriorityQueue.java
index 7c73f382569..bf24227a3c4 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/LongPriorityQueue.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/LongPriorityQueue.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.table;
+package org.apache.ignite.internal.util;
 
 import java.util.ArrayList;
 import java.util.Comparator;
@@ -24,7 +24,7 @@ import java.util.PriorityQueue;
 import java.util.function.ToLongFunction;
 
 /**
- * A thread-safe wrapper over {@link java.util.PriorityQueue}, which uses 
{@code long} value for ordering.
+ * A thread-safe wrapper over {@link PriorityQueue}, which uses {@code long} 
value for ordering.
  * The implementation provides a method to poll top item up to the given 
priority.
  *
  * @param <T> Item type.
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/LongPriorityQueueSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/util/LongPriorityQueueSelfTest.java
similarity index 99%
rename from 
modules/table/src/test/java/org/apache/ignite/internal/table/LongPriorityQueueSelfTest.java
rename to 
modules/core/src/test/java/org/apache/ignite/internal/util/LongPriorityQueueSelfTest.java
index 3a52ebd6980..22656443cc8 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/LongPriorityQueueSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/util/LongPriorityQueueSelfTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.table;
+package org.apache.ignite.internal.util;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.empty;
diff --git 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 98d8d7aa0fb..1cdac08bce0 100644
--- 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -658,7 +658,7 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
     @Test
     @UseTestTxStateStorage
     @UseRocksMetaStorage
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-24345";)
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-27467";)
     void testDestroyPartitionStoragesOnRestartEvictedNode(TestInfo testInfo) 
throws Exception {
         Node node = getNode(0);
 
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DataNodesManager.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DataNodesManager.java
index d4777864412..b6357034452 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DataNodesManager.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DataNodesManager.java
@@ -1389,8 +1389,8 @@ public class DataNodesManager {
         }
     }
 
-    CompletableFuture<?> onZoneDrop(int zoneId, HybridTimestamp timestamp) {
-        return removeDataNodesKeys(zoneId, timestamp)
+    CompletableFuture<?> onZoneDestroy(int zoneId, int dropZoneCatalogVersion) 
{
+        return removeDataNodesKeys(zoneId, dropZoneCatalogVersion)
                 .thenRun(() -> {
                     ZoneTimers zt = zoneTimers.remove(zoneId);
                     if (zt != null) {
@@ -1403,9 +1403,9 @@ public class DataNodesManager {
      * Method deletes data nodes related values for the specified zone.
      *
      * @param zoneId Unique id of a zone.
-     * @param timestamp Timestamp of an event that has triggered this method.
+     * @param dropZoneCatalogVersion Version of catalog when zone was dropped.
      */
-    private CompletableFuture<?> removeDataNodesKeys(int zoneId, 
HybridTimestamp timestamp) {
+    private CompletableFuture<?> removeDataNodesKeys(int zoneId, int 
dropZoneCatalogVersion) {
         if (!busyLock.enterBusy()) {
             throw new IgniteInternalException(NODE_STOPPING_ERR, new 
NodeStoppingException());
         }
@@ -1414,7 +1414,7 @@ public class DataNodesManager {
             Condition condition = exists(zoneDataNodesHistoryKey(zoneId));
 
             Update removeKeysUpd = ops(
-                    // TODO remove(zoneDataNodesHistoryKey(zoneId)), 
https://issues.apache.org/jira/browse/IGNITE-24345
+                    remove(zoneDataNodesHistoryKey(zoneId)),
                     remove(zoneScaleUpTimerKey(zoneId)),
                     remove(zoneScaleDownTimerKey(zoneId))
             ).yield(true);
@@ -1427,16 +1427,18 @@ public class DataNodesManager {
                         if (e != null) {
                             if (!relatesToNodeStopping(e)) {
                                 String errorMessage = String.format(
-                                        "Failed to delete zone's dataNodes 
keys [zoneId = %s, timestamp = %s]",
+                                        "Failed to delete zone's dataNodes 
keys [zoneId = %s, dropZoneCatalogVersion = %s]",
                                         zoneId,
-                                        timestamp
+                                        dropZoneCatalogVersion
                                 );
                                 failureProcessor.process(new FailureContext(e, 
errorMessage));
                             }
                         } else if (invokeResult) {
-                            LOG.info("Delete zone's dataNodes keys [zoneId = 
{}, timestamp = {}]", zoneId, timestamp);
+                            LOG.info("Delete zone's dataNodes keys [zoneId = 
{}, dropZoneCatalogVersion = {}]", zoneId,
+                                    dropZoneCatalogVersion);
                         } else {
-                            LOG.debug("Failed to delete zone's dataNodes keys 
[zoneId = {}, timestamp = {}]", zoneId, timestamp);
+                            LOG.debug("Failed to delete zone's dataNodes keys 
[zoneId = {}, dropZoneCatalogVersion = {}]", zoneId,
+                                    dropZoneCatalogVersion);
                         }
                     });
         } finally {
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index b0c686f9975..194ac46b6f6 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -51,6 +51,7 @@ import static 
org.apache.ignite.internal.metastorage.dsl.Statements.iif;
 import static 
org.apache.ignite.internal.util.ByteUtils.bytesToLongKeepingOrder;
 import static 
org.apache.ignite.internal.util.ByteUtils.longToBytesKeepingOrder;
 import static org.apache.ignite.internal.util.ByteUtils.uuidToBytes;
+import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
@@ -803,7 +804,8 @@ public class DistributionZoneManager extends
         }));
 
         catalogManager.listen(ZONE_DROP, (DropZoneEventParameters parameters) 
-> inBusyLock(busyLock, () -> {
-            return onDropZoneBusy(parameters).thenApply((ignored) -> false);
+            onDropZoneBusy(parameters);
+            return falseCompletedFuture();
         }));
 
         catalogManager.listen(ZONE_ALTER, new 
ManagerCatalogAlterZoneEventListener());
@@ -891,14 +893,12 @@ public class DistributionZoneManager extends
         return nullCompletedFuture();
     }
 
-    private CompletableFuture<?> onDropZoneBusy(DropZoneEventParameters 
parameters) {
+    private void onDropZoneBusy(DropZoneEventParameters parameters) {
         unregisterMetricSource(parameters.zoneId());
+    }
 
-        long causalityToken = parameters.causalityToken();
-
-        HybridTimestamp timestamp = 
metaStorageManager.timestampByRevisionLocally(causalityToken);
-
-        return dataNodesManager.onZoneDrop(parameters.zoneId(), timestamp);
+    public CompletableFuture<?> onDropZoneDestroy(int zoneId, int 
dropZoneCatalogVersion) {
+        return dataNodesManager.onZoneDestroy(zoneId, dropZoneCatalogVersion);
     }
 
     private class ManagerCatalogAlterZoneEventListener extends 
CatalogAlterZoneEventListener {
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
index e0b4a550adb..c829c44ee6f 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerConfigurationChangesTest.java
@@ -73,21 +73,6 @@ public class DistributionZoneManagerConfigurationChangesTest 
extends BaseDistrib
         assertZonesKeysInMetaStorage(zoneId, nodes);
     }
 
-    @ParameterizedTest
-    @EnumSource(ConsistencyMode.class)
-    void testZoneDeleteRemovesMetaStorageKey(ConsistencyMode consistencyMode) 
throws Exception {
-        createZone(ZONE_NAME, consistencyMode);
-
-        int zoneId = getZoneId(ZONE_NAME);
-
-        assertDataNodesFromLogicalNodesInStorage(zoneId, nodes, 
keyValueStorage);
-
-        dropZone(ZONE_NAME);
-
-        // Data nodes should be removed from meta storage
-        assertZonesKeysInMetaStorage(zoneId, null, false);
-    }
-
     @ParameterizedTest
     @EnumSource(ConsistencyMode.class)
     void testSeveralZoneCreationsUpdatesTriggerKey(ConsistencyMode 
consistencyMode) throws Exception {
diff --git 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpScaleDownTest.java
 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpScaleDownTest.java
index 421e926b745..36d48b5c091 100644
--- 
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpScaleDownTest.java
+++ 
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpScaleDownTest.java
@@ -26,7 +26,6 @@ import static 
org.apache.ignite.internal.distributionzones.DistributionZonesTest
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertDataNodesInStorage;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.assertLogicalTopology;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.createDefaultZone;
-import static 
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.dataNodeHistoryContext;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.logicalNodeFromNode;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.PARTITION_DISTRIBUTION_RESET_TIMEOUT;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zonesLogicalTopologyKey;
@@ -46,7 +45,6 @@ import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.ConsistencyMode;
 import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
-import 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.DataNodesHistoryContext;
 import 
org.apache.ignite.internal.distributionzones.events.HaZoneTopologyUpdateEvent;
 import 
org.apache.ignite.internal.distributionzones.events.HaZoneTopologyUpdateEventParams;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -221,70 +219,6 @@ public class DistributionZoneManagerScaleUpScaleDownTest 
extends BaseDistributio
         assertDataNodesFromLogicalNodesInStorage(defaultZone.id(), 
clusterNodes2, keyValueStorage);
     }
 
-    @Test
-    void testDropZoneDoNotPropagateDataNodesAfterScaleUp() throws Exception {
-        startDistributionZoneManager();
-
-        topology.putNode(NODE_A);
-
-        topology.putNode(NODE_B);
-
-        Set<LogicalNode> clusterNodes2 = Set.of(NODE_A, NODE_B);
-
-        assertLogicalTopology(clusterNodes2, keyValueStorage);
-
-        createZone(ZONE_NAME, IMMEDIATE_TIMER_VALUE, null, null);
-
-        int zoneId = getZoneId(ZONE_NAME);
-
-        assertDataNodesFromLogicalNodesInStorage(zoneId, clusterNodes2, 
keyValueStorage);
-
-        DataNodesHistoryContext context = 
dataNodeHistoryContext(metaStorageManager, zoneId);
-        assertTrue(context.scaleUpTimerPresent());
-        assertTrue(context.scaleDownTimerPresent());
-
-        dropZone(ZONE_NAME);
-
-        // Data nodes history should not be dropped after zone drop. Deferred 
removal should happen on LWM move.
-        assertDataNodesFromLogicalNodesInStorage(zoneId, clusterNodes2, 
keyValueStorage);
-        context = dataNodeHistoryContext(metaStorageManager, zoneId);
-        assertFalse(context.scaleUpTimerPresent());
-        assertFalse(context.scaleDownTimerPresent());
-    }
-
-    @Test
-    void testDropZoneDoNotPropagateDataNodesAfterScaleDown() throws Exception {
-        startDistributionZoneManager();
-
-        topology.putNode(NODE_A);
-
-        topology.putNode(NODE_B);
-
-        topology.removeNodes(Set.of(NODE_B));
-
-        Set<LogicalNode> clusterNodes2 = Set.of(NODE_A);
-
-        assertLogicalTopology(clusterNodes2, keyValueStorage);
-
-        createZone(ZONE_NAME, null, IMMEDIATE_TIMER_VALUE, null);
-
-        int zoneId = getZoneId(ZONE_NAME);
-
-        assertDataNodesFromLogicalNodesInStorage(zoneId, clusterNodes2, 
keyValueStorage);
-
-        DataNodesHistoryContext context = 
dataNodeHistoryContext(metaStorageManager, zoneId);
-        assertTrue(context.scaleUpTimerPresent());
-        assertTrue(context.scaleDownTimerPresent());
-
-        dropZone(ZONE_NAME);
-
-        // Data nodes history should not be dropped after zone drop. Deferred 
removal should happen on LWM move.
-        assertDataNodesFromLogicalNodesInStorage(zoneId, clusterNodes2, 
keyValueStorage);
-        context = dataNodeHistoryContext(metaStorageManager, zoneId);
-        assertFalse(context.scaleUpTimerPresent());
-        assertFalse(context.scaleDownTimerPresent());
-    }
-
     @Test
     void testEmptyDataNodesOnStart() throws Exception {
         startDistributionZoneManager();
diff --git 
a/modules/index/src/main/java/org/apache/ignite/internal/index/ChangeIndexStatusTaskController.java
 
b/modules/index/src/main/java/org/apache/ignite/internal/index/ChangeIndexStatusTaskController.java
index 5ff838153ff..204bb01fdca 100644
--- 
a/modules/index/src/main/java/org/apache/ignite/internal/index/ChangeIndexStatusTaskController.java
+++ 
b/modules/index/src/main/java/org/apache/ignite/internal/index/ChangeIndexStatusTaskController.java
@@ -49,8 +49,8 @@ import org.apache.ignite.internal.placementdriver.ReplicaMeta;
 import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
 import 
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
-import org.apache.ignite.internal.table.LongPriorityQueue;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.LongPriorityQueue;
 
 /**
  * Component that reacts to certain Catalog changes and starts or stops 
corresponding {@link ChangeIndexStatusTask}s via the
diff --git 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index fc813940d31..cef5ae464b6 100644
--- 
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++ 
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -59,11 +59,11 @@ import org.apache.ignite.internal.schema.SchemaRegistry;
 import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.storage.index.IndexStorage;
-import org.apache.ignite.internal.table.LongPriorityQueue;
 import org.apache.ignite.internal.table.TableViewInternal;
 import org.apache.ignite.internal.table.distributed.PartitionSet;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.LongPriorityQueue;
 
 /**
  * An Ignite component that is responsible for handling index-related commands 
like CREATE or DROP as well as managing indexes' lifecycle.
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index 8316ee50346..c4b66184794 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -29,6 +29,7 @@ import static java.util.function.Function.identity;
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toSet;
 import static 
org.apache.ignite.internal.catalog.events.CatalogEvent.ZONE_CREATE;
+import static org.apache.ignite.internal.catalog.events.CatalogEvent.ZONE_DROP;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.subtract;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.union;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.ASSIGNMENTS_SWITCH_REDUCE_PREFIX_BYTES;
@@ -36,13 +37,18 @@ import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalan
 import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.STABLE_ASSIGNMENTS_PREFIX_BYTES;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.assignmentsChainKey;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.extractZonePartitionId;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.pendingChangeTriggerKey;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.pendingPartAssignmentsQueueKey;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.plannedPartAssignmentsKey;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.stablePartAssignmentsKey;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.switchAppendKey;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.switchReduceKey;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.zoneAssignmentsChainGetLocally;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.zoneAssignmentsGetLocally;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.zonePartitionAssignmentsGetLocally;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.zonePendingAssignmentsGetLocally;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.zoneStableAssignmentsGetLocally;
+import static org.apache.ignite.internal.event.EventListener.fromConsumer;
 import static 
org.apache.ignite.internal.hlc.HybridTimestamp.LOGICAL_TIME_BITS_SIZE;
 import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
 import static 
org.apache.ignite.internal.hlc.HybridTimestamp.nullableHybridTimestamp;
@@ -64,6 +70,7 @@ import static 
org.apache.ignite.internal.tostring.IgniteToStringBuilder.COLLECTI
 import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
 import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static 
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
 import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockAsync;
@@ -97,6 +104,7 @@ import 
org.apache.ignite.internal.catalog.descriptors.CatalogStorageProfileDescr
 import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.ConsistencyMode;
 import org.apache.ignite.internal.catalog.events.CreateZoneEventParameters;
+import org.apache.ignite.internal.catalog.events.DropZoneEventParameters;
 import org.apache.ignite.internal.components.NodeProperties;
 import org.apache.ignite.internal.configuration.SystemDistributedConfiguration;
 import 
org.apache.ignite.internal.configuration.utils.SystemDistributedConfigurationPropertyHolder;
@@ -118,6 +126,8 @@ import 
org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.lowwatermark.LowWatermark;
+import 
org.apache.ignite.internal.lowwatermark.event.ChangeLowWatermarkEventParameters;
+import org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent;
 import org.apache.ignite.internal.manager.ComponentContext;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.metastorage.Entry;
@@ -173,10 +183,12 @@ import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.impl.TransactionStateResolver;
 import org.apache.ignite.internal.tx.impl.TxMessageSender;
 import org.apache.ignite.internal.tx.storage.state.TxStatePartitionStorage;
+import 
org.apache.ignite.internal.tx.storage.state.TxStateStorageRebalanceException;
 import 
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
 import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.LongPriorityQueue;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.internal.util.TrackerClosedException;
 import org.jetbrains.annotations.Nullable;
@@ -280,12 +292,18 @@ public class PartitionReplicaLifecycleManager extends
 
     private final EventListener<CreateZoneEventParameters> 
onCreateZoneListener = this::onCreateZone;
     private final EventListener<PrimaryReplicaEventParameters> 
onPrimaryReplicaExpiredListener = this::onPrimaryReplicaExpired;
+    private final EventListener<DropZoneEventParameters> onZoneDropListener = 
fromConsumer(this::onZoneDrop);
+
+    private final EventListener<ChangeLowWatermarkEventParameters> 
onLowWatermarkChangedListener = this::onLwmChanged;
 
     /**
      * This future completes on {@link #beforeNodeStop()} with {@link 
NodeStoppingException} before the {@link #busyLock} is blocked.
      */
     private final CompletableFuture<Void> stopReplicaLifecycleFuture = new 
CompletableFuture<>();
 
+    private final LongPriorityQueue<DestroyZoneEvent> destructionEventsQueue =
+            new LongPriorityQueue<>(DestroyZoneEvent::catalogVersion);
+
     /**
      * The constructor.
      *
@@ -457,7 +475,7 @@ public class PartitionReplicaLifecycleManager extends
         assert recoveryFinishFuture.isDone();
         long recoveryRevision = recoveryFinishFuture.join().revision();
 
-        cleanUpResourcesForDroppedZonesOnRecovery();
+        handleResourcesForDroppedZonesOnRecovery();
 
         HybridTimestamp safeLwm = catalogSafeLowWatermark(lowWatermark, 
catalogService);
         CompletableFuture<Void> processZonesAndAssignmentsOnStart = 
processZonesOnStart(recoveryRevision, safeLwm)
@@ -468,6 +486,9 @@ public class PartitionReplicaLifecycleManager extends
         metaStorageMgr.registerPrefixWatch(new 
ByteArray(ASSIGNMENTS_SWITCH_REDUCE_PREFIX_BYTES), 
assignmentsSwitchRebalanceListener);
 
         catalogService.listen(ZONE_CREATE, onCreateZoneListener);
+        catalogService.listen(ZONE_DROP, onZoneDropListener);
+
+        lowWatermark.listen(LowWatermarkEvent.LOW_WATERMARK_CHANGED, 
onLowWatermarkChangedListener);
 
         rebalanceRetryDelayConfiguration.init();
 
@@ -495,12 +516,34 @@ public class PartitionReplicaLifecycleManager extends
         var startedZones = new IntOpenHashSet();
         var startZoneFutures = new ArrayList<CompletableFuture<?>>();
 
+        Catalog nextCatalog = null;
+
         for (int ver = latestCatalogVersion; ver >= earliestCatalogVersion; 
ver--) {
+            Catalog catalog = catalogService.catalog(ver);
+            Catalog finalNextCatalog = nextCatalog;
+
             int ver0 = ver;
             catalogService.catalog(ver).zones().stream()
                     .filter(zone -> startedZones.add(zone.id()))
-                    .forEach(zoneDescriptor -> startZoneFutures.add(
-                            
calculateZoneAssignmentsAndCreateReplicationNodes(recoveryRevision, ver0, 
zoneDescriptor, true)));
+                    .forEach(zoneDescriptor -> {
+                        int zoneId = zoneDescriptor.id();
+
+                        startZoneFutures.add(
+                                
calculateZoneAssignmentsAndCreateReplicationNodes(recoveryRevision, ver0, 
zoneDescriptor, true));
+
+                        // Handle missed zone drop event.
+                        if (finalNextCatalog != null && 
finalNextCatalog.zone(zoneId) == null) {
+                            destructionEventsQueue.enqueue(
+                                    new DestroyZoneEvent(
+                                            finalNextCatalog.version(),
+                                            zoneId,
+                                            zoneDescriptor.partitions()
+                                    )
+                            );
+                        }
+                    });
+
+            nextCatalog = catalog;
         }
 
         return allOf(startZoneFutures.toArray(CompletableFuture[]::new))
@@ -570,8 +613,8 @@ public class PartitionReplicaLifecycleManager extends
         }
     }
 
-    private void cleanUpResourcesForDroppedZonesOnRecovery() {
-        // TODO: IGNITE-20384 Clean up abandoned resources for dropped zones 
from vault and metastore
+    private void handleResourcesForDroppedZonesOnRecovery() {
+        // TODO: https://issues.apache.org/jira/browse/IGNITE-27466 Handle 
abandoned resources for dropped zones from vault and metastore
     }
 
     private CompletableFuture<Boolean> onCreateZone(CreateZoneEventParameters 
createZoneEventParameters) {
@@ -927,7 +970,10 @@ public class PartitionReplicaLifecycleManager extends
 
         
executorInclinedPlacementDriver.removeListener(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED,
 onPrimaryReplicaExpiredListener);
 
+        lowWatermark.removeListener(LowWatermarkEvent.LOW_WATERMARK_CHANGED, 
onLowWatermarkChangedListener);
+
         catalogService.removeListener(ZONE_CREATE, onCreateZoneListener);
+        catalogService.removeListener(ZONE_DROP, onZoneDropListener);
 
         metaStorageMgr.unregisterWatch(pendingAssignmentsRebalanceListener);
         metaStorageMgr.unregisterWatch(stableAssignmentsRebalanceListener);
@@ -1669,7 +1715,7 @@ public class PartitionReplicaLifecycleManager extends
             Assignments pendingAssignments,
             long revision
     ) {
-        Entry reduceEntry = 
metaStorageMgr.getLocally(ZoneRebalanceUtil.switchReduceKey(replicaGrpId), 
revision);
+        Entry reduceEntry = 
metaStorageMgr.getLocally(switchReduceKey(replicaGrpId), revision);
 
         Assignments reduceAssignments = reduceEntry != null
                 ? Assignments.fromBytes(reduceEntry.value())
@@ -2045,6 +2091,174 @@ public class PartitionReplicaLifecycleManager extends
         return zoneResourcesManager.getZonePartitionResources(zonePartitionId);
     }
 
+    private void onZoneDrop(DropZoneEventParameters parameters) {
+        inBusyLock(busyLock, () -> {
+            int eventCatalogVersion = parameters.catalogVersion();
+            int catalogVersionWithZonePresent = eventCatalogVersion - 1;
+            int zoneId = parameters.zoneId();
+
+            CatalogZoneDescriptor zoneDescriptor = 
catalogService.catalog(catalogVersionWithZonePresent).zone(zoneId);
+
+            assert zoneDescriptor != null : "Unexpected null zone descriptor 
for zoneId=" + zoneId + ", catalogVersion "
+                    + catalogVersionWithZonePresent;
+
+            destructionEventsQueue.enqueue(
+                    new DestroyZoneEvent(
+                            eventCatalogVersion,
+                            zoneId,
+                            zoneDescriptor.partitions()
+                    )
+            );
+        });
+    }
+
+    // TODO https://issues.apache.org/jira/browse/IGNITE-27468 Not 
"thread-safe" in case of concurrent disaster recovery or rebalances.
+    private CompletableFuture<Boolean> 
onLwmChanged(ChangeLowWatermarkEventParameters parameters) {
+        if (!busyLock.enterBusy()) {
+            return falseCompletedFuture();
+        }
+
+        try {
+            int newEarliestCatalogVersion = 
catalogService.activeCatalogVersion(parameters.newLowWatermark().longValue());
+
+            // Run zone destruction fully asynchronously.
+            destructionEventsQueue.drainUpTo(newEarliestCatalogVersion)
+                    .forEach(this::removeZonePartitionsIfPossible);
+
+            return falseCompletedFuture();
+        } catch (Throwable t) {
+            return failedFuture(t);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+
+    /**
+     * For each partition performs following actions.
+     *
+     * <ol>
+     *     <li>Check whether it's started or await if it's starting.</li>
+     *     <li>Check whether the partition is eligible for removal -  has zero 
table resources and empty txStateStorage.</li>
+     *     <li>Stop partition, destroy zone partition resources and unregister 
it from within startedReplicationGroups.</li>
+     *     <li>Remove partition assignments from meta storage.</li>
+     * </ol>
+     */
+    private void removeZonePartitionsIfPossible(DestroyZoneEvent event) {
+        int zoneId = event.zoneId();
+        int partitionsCount = event.partitions();
+
+        List<CompletableFuture<Boolean>> 
partitionsEligibilityForRemovalFutures = new ArrayList<>();
+        for (int partitionIndex = 0; partitionIndex < partitionsCount; 
partitionIndex++) {
+            ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId, 
partitionIndex);
+
+            CompletableFuture<Boolean> partitionRemovalFuture =
+                    
startedReplicationGroups.hasReplicationGroupStartedOrAwaitIfStarting(zonePartitionId)
+                            .thenComposeAsync(started -> {
+                                if (!started) {
+                                    return falseCompletedFuture();
+                                }
+
+                                return inBusyLockAsync(busyLock, () ->
+                                        isEligibleForDrop(zonePartitionId)
+                                                .thenCompose(eligible -> {
+                                                    if (!eligible) {
+                                                        return 
falseCompletedFuture();
+                                                    }
+
+                                                    // It's safe to use -1 as 
revision id here, since we only stop partitions that do not
+                                                    // have active 
table-related resources.
+                                                    return 
stopAndDestroyPartition(zonePartitionId, -1)
+                                                            .thenCompose(v -> 
dropAssignments(zonePartitionId))
+                                                            .thenApply(v -> 
true);
+                                                })
+                                );
+                            }, partitionOperationsExecutor)
+                            .exceptionally(e -> {
+                                if (!hasCause(e, NodeStoppingException.class)) 
{
+                                    LOG.error(
+                                            "Unable to destroy zone partition 
[zonePartitionId={}]",
+                                            e,
+                                            zonePartitionId);
+                                }
+
+                                // In case of false, event will be returned to 
the destructionEventsQueue and thus removal will be retried
+                                // on next iteration of LWM change.
+                                return false;
+                            });
+
+            partitionsEligibilityForRemovalFutures.add(partitionRemovalFuture);
+        }
+
+        // If there's a partition that still has non empty resourses e.g. 
non-empty txnStateStorage, the event is returned
+        // back to destructionEventsQueue and thus will be re-processed on 
next lwm change.
+        allOf(partitionsEligibilityForRemovalFutures.toArray(new 
CompletableFuture[0]))
+                .thenApply(fs -> 
partitionsEligibilityForRemovalFutures.stream().anyMatch(f -> !f.join()))
+                .thenAccept(anyFalse -> {
+                    if (anyFalse) {
+                        destructionEventsQueue.enqueue(event);
+                    } else {
+                        distributionZoneMgr.onDropZoneDestroy(zoneId, 
event.catalogVersion)
+                                .whenComplete((r, e) -> {
+                                    if (e != null) {
+                                        LOG.error(
+                                                "Unable to destroy zone 
resources [zoneId={}]",
+                                                e,
+                                                zoneId);
+                                    }
+                                });
+                    }
+                });
+    }
+
+    /**
+     * Checks whether partition could be removed. In order to match the 
condition, a zone partition should have zero table resources and
+     * empty txStateStorage.
+     */
+    private CompletableFuture<Boolean> isEligibleForDrop(ZonePartitionId 
zonePartitionId) {
+        ZonePartitionResources zonePartitionResources = 
zoneResourcesManager.getZonePartitionResources(zonePartitionId);
+
+        if (zonePartitionResources == null) {
+            return trueCompletedFuture();
+        }
+
+        try (var cursor = 
zonePartitionResources.txStatePartitionStorage().scan()) {
+            if (cursor.hasNext()) {
+                return falseCompletedFuture();
+            }
+        } catch (TxStateStorageRebalanceException e) {
+            return falseCompletedFuture();
+        }
+
+        // In order to simplify the logic of table resources cleanup, that is 
handled by TableManager and
+        // zone resources cleanup that is handled by 
PartitionReplicaLifecycleManager, on zone destruction event
+        // we await TableManager to finish its own job instead of stealing it.
+        // In case when both tables and zone were destroyed on the same lwm, 
PartitionReplicaLifecycleManager will cleanup
+        // zone resources within the next lwm update.
+        return zoneResourcesManager.areTableResourcesEmpty(zonePartitionId);
+    }
+
+    /**
+     * Removes all zone partition assignment keys from metastorage.
+     */
+    private CompletableFuture<Void> dropAssignments(ZonePartitionId 
zonePartitionId) {
+        Set<ByteArray> assignmentKeys = Set.of(
+                stablePartAssignmentsKey(zonePartitionId),
+                pendingPartAssignmentsQueueKey(zonePartitionId),
+                pendingChangeTriggerKey(zonePartitionId),
+                plannedPartAssignmentsKey(zonePartitionId),
+                switchAppendKey(zonePartitionId),
+                switchReduceKey(zonePartitionId),
+                assignmentsChainKey(zonePartitionId)
+        );
+
+        return metaStorageMgr.removeAll(assignmentKeys)
+                .whenComplete((v, e) -> {
+                    if (e != null) {
+                        LOG.error("Failed to remove assignments from 
metastorage [zonePartitionId={}]", e, zonePartitionId);
+                    }
+                });
+    }
+
     /**
      * For HA zones: Check that last rebalance was graceful (caused by common 
rebalance triggers, like data nodes change, replica factor
      * change, etc.) rather than forced (caused by a disaster recovery reset 
after losing the majority of nodes).
@@ -2054,6 +2268,35 @@ public class PartitionReplicaLifecycleManager extends
         return assignmentsChain == null || assignmentsChain.size() == 1;
     }
 
+    /** Internal event. */
+    private static class DestroyZoneEvent {
+        final int catalogVersion;
+        final int zoneId;
+        final int partitions;
+
+        DestroyZoneEvent(
+                int catalogVersion,
+                int zoneId,
+                int partitions
+        ) {
+            this.catalogVersion = catalogVersion;
+            this.zoneId = zoneId;
+            this.partitions = partitions;
+        }
+
+        int catalogVersion() {
+            return catalogVersion;
+        }
+
+        int zoneId() {
+            return zoneId;
+        }
+
+        int partitions() {
+            return partitions;
+        }
+    }
+
     /**
      * Factory for creating table partition replica processors.
      */
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/StartedReplicationGroups.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/StartedReplicationGroups.java
index cb66641923d..35816ab71cf 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/StartedReplicationGroups.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/StartedReplicationGroups.java
@@ -18,6 +18,8 @@
 package org.apache.ignite.internal.partition.replicator;
 
 import static org.apache.ignite.internal.util.CompletableFutures.copyStateTo;
+import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
+import static 
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
 
 import java.util.Map;
 import java.util.Set;
@@ -100,6 +102,23 @@ class StartedReplicationGroups {
         return startedReplicationGroupIds.contains(zonePartitionId);
     }
 
+    /**
+     * Returns trueCompleted future if group was already started or awaits 
groups startup if it's in the middle of the start process,
+     * otherwise returns falseCompleted future.
+     */
+    CompletableFuture<Boolean> 
hasReplicationGroupStartedOrAwaitIfStarting(ZonePartitionId zonePartitionId) {
+        if (startedReplicationGroupIds.contains(zonePartitionId)) {
+            return trueCompletedFuture();
+        }
+
+        CompletableFuture<Void> groupStartingFuture = 
startingReplicationGroupIds.get(zonePartitionId);
+        if (groupStartingFuture != null) {
+            return groupStartingFuture.thenCompose(ignored -> 
trueCompletedFuture());
+        }
+
+        return falseCompletedFuture();
+    }
+
     private void completeStartingFuture(ZonePartitionId zonePartitionId) {
         CompletableFuture<Void> startingFuture = 
startingReplicationGroupIds.remove(zonePartitionId);
 
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
index 61f8e95e3cf..860204fea6e 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
@@ -330,6 +330,13 @@ public class ZonePartitionReplicaListener implements 
ReplicaListener {
         return processor == null ? null : processor.txRwOperationTracker();
     }
 
+    /**
+     * Returns true if there are no table replica processors, false otherwise.
+     */
+    boolean areTableReplicaProcessorsEmpty() {
+        return replicaProcessors.isEmpty();
+    }
+
     /**
      * Return table replicas listeners.
      *
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
index c32da64d051..552984b8784 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManager.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.partition.replicator;
 
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static 
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 
 import java.util.Map;
@@ -211,6 +212,24 @@ public class ZoneResourcesManager implements 
ManuallyCloseable {
                 });
     }
 
+    /**
+     *  Returns future of true if there are no corresponding table-related 
resources, otherwise awaits replicaListenerFuture
+     *  and checks whether table replica processors, table raft processors and 
partition snapshot storages are present.
+     *  if any is present, returns false, otherwise returns true.
+     */
+    CompletableFuture<Boolean> areTableResourcesEmpty(ZonePartitionId 
zonePartitionId) {
+        ZonePartitionResources resources = 
getZonePartitionResources(zonePartitionId);
+
+        if (resources == null) {
+            return trueCompletedFuture();
+        }
+
+        return resources.replicaListenerFuture
+                .thenApply(zoneReplicaListener -> 
zoneReplicaListener.areTableReplicaProcessorsEmpty()
+                        && 
resources.raftListener().areTableRaftProcessorsEmpty()
+                        && 
resources.snapshotStorage().arePartitionSnapshotStoragesEmpty());
+    }
+
     @TestOnly
     @Nullable
     TxStatePartitionStorage txStatePartitionStorage(int zoneId, int 
partitionId) {
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
index e9117c93503..3a2114ae7ee 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
@@ -491,6 +491,15 @@ public class ZonePartitionRaftListener implements 
RaftGroupListener {
         }
     }
 
+    /**
+     * Returns true if there are no table processors, false otherwise.
+     */
+    public boolean areTableRaftProcessorsEmpty() {
+        synchronized (tableProcessorsStateLock) {
+            return tableProcessors.isEmpty();
+        }
+    }
+
     private void cleanupSnapshots() {
         partitionsSnapshots.cleanupOutgoingSnapshots(partitionKey);
     }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
index 705c598a1ef..c98b3315643 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionSnapshotStorage.java
@@ -205,6 +205,15 @@ public class PartitionSnapshotStorage {
         }
     }
 
+    /**
+     * Returns true if there are neither ongoing snapshot operations nor 
partition snapshot storages, falser otherwise.
+     */
+    public boolean arePartitionSnapshotStoragesEmpty() {
+        synchronized (snapshotOperationLock) {
+            return ongoingSnapshotOperations.isEmpty() && 
partitionsByTableId.isEmpty();
+        }
+    }
+
     /**
      * Returns the TX state storage.
      */
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java
index 5c92b69c99c..f16bc19ca4f 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/statistic/SqlStatisticManagerImpl.java
@@ -55,9 +55,9 @@ import 
org.apache.ignite.internal.lowwatermark.event.LowWatermarkEvent;
 import 
org.apache.ignite.internal.sql.engine.statistic.event.StatisticChangedEvent;
 import 
org.apache.ignite.internal.sql.engine.statistic.event.StatisticEventParameters;
 import org.apache.ignite.internal.table.InternalTable;
-import org.apache.ignite.internal.table.LongPriorityQueue;
 import org.apache.ignite.internal.table.TableViewInternal;
 import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.util.LongPriorityQueue;
 import org.jetbrains.annotations.TestOnly;
 
 /**
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/partition/ItPartitionDestructionTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/partition/ItPartitionDestructionTest.java
index b448cc4f164..a63274710d0 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/partition/ItPartitionDestructionTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/partition/ItPartitionDestructionTest.java
@@ -23,8 +23,18 @@ import static java.util.stream.Collectors.toSet;
 import static 
org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME;
 import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
 import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
-import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneDataNodesHistoryKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleDownTimerKey;
+import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.zoneScaleUpTimerKey;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.assignmentsChainKey;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.pendingChangeTriggerKey;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.pendingPartAssignmentsQueueKey;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.plannedPartAssignmentsKey;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.stablePartAssignmentsKey;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.switchAppendKey;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.switchReduceKey;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.awaitility.Awaitility.await;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.hasSize;
@@ -32,6 +42,7 @@ import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.io.FileMatchers.anExistingFile;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
 
 import java.io.File;
 import java.nio.ByteBuffer;
@@ -53,17 +64,19 @@ import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.configuration.IgnitePaths;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lowwatermark.LowWatermarkImpl;
+import org.apache.ignite.internal.metastorage.Entry;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.partitiondistribution.Assignment;
 import org.apache.ignite.internal.partitiondistribution.TokenizedAssignments;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.RaftNodeId;
 import org.apache.ignite.internal.replicator.PartitionGroupId;
-import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
 import org.apache.ignite.internal.sql.engine.util.SqlTestUtils;
 import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.testframework.SystemPropertiesExtension;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
+import org.apache.ignite.internal.tx.metrics.ResourceVacuumMetrics;
 import 
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbPartitionStorage;
 import org.apache.ignite.internal.vault.VaultService;
 import org.apache.ignite.internal.vault.persistence.PersistentVaultService;
@@ -137,7 +150,6 @@ class ItPartitionDestructionTest extends 
ClusterPerTestIntegrationTest {
     }
 
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-24345";)
     void partitionIsDestroyedOnZoneDestruction() throws Exception {
         cluster.startAndInit(1, 
ItPartitionDestructionTest::aggressiveLowWatermarkIncrease);
 
@@ -151,10 +163,23 @@ class ItPartitionDestructionTest extends 
ClusterPerTestIntegrationTest {
 
         makeSurePartitionExistsOnDisk(ignite0, tableId, replicationGroupId);
 
+        verifyZoneDistributionZoneManagerResourcesArePresent(ignite0, 
replicationGroupId.zoneId());
+
         executeUpdate("DROP TABLE " + TABLE_NAME);
         executeUpdate("DROP ZONE " + ZONE_NAME);
 
+        verifyPartitionMvDataGetsRemovedFromDisk(ignite0, tableId, 
replicationGroupId);
+
+        verifyPartitionNonMvDataExistsOnDisk(ignite0, replicationGroupId);
+
+        // Trigger txStateStorage vacuumization that will remove all records 
from the storage and thus make it eligible for removal.
+        
assertThat(ignite0.txManager().vacuum(mock(ResourceVacuumMetrics.class)), 
willCompleteSuccessfully());
+
         verifyPartitionGetsFullyRemovedFromDisk(ignite0, tableId, 
replicationGroupId);
+
+        verifyAssignmentKeysWereRemovedFromMetaStorage(ignite0, 
replicationGroupId);
+
+        
verifyZoneDistributionZoneManagerResourcesWereRemovedFromMetaStorage(ignite0, 
replicationGroupId.zoneId());
     }
 
     /**
@@ -208,8 +233,17 @@ class ItPartitionDestructionTest extends 
ClusterPerTestIntegrationTest {
     }
 
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-24345";)
-    void partitionIsDestroyedOnZoneDestructionOnNodeRecovery() throws 
Exception {
+    public void 
partitionIsDestroyedOnZoneDestructionOnNodeRecoveryIfLwmIsNotMovedWhileNodeIsAbsent()
 throws Exception {
+        verifyPartitionIsDestroyedOnZoneDestructionOnNodeRecovery(false);
+    }
+
+    @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-27466";)
+    public void 
partitionIsDestroyedOnZoneDestructionOnNodeRecoveryIfLwmIsMovedWhileNodeIsAbsent()
 throws Exception {
+        verifyPartitionIsDestroyedOnZoneDestructionOnNodeRecovery(true);
+    }
+
+    private void 
verifyPartitionIsDestroyedOnZoneDestructionOnNodeRecovery(boolean 
moveLwmWhileNodeIsAbsent) throws Exception {
         cluster.startAndInit(1, 
ItPartitionDestructionTest::aggressiveLowWatermarkIncrease);
         IgniteImpl ignite0 = unwrapIgniteImpl(cluster.node(0));
         Path workDir0 = ((IgniteServerImpl) cluster.server(0)).workDir();
@@ -234,15 +268,28 @@ class ItPartitionDestructionTest extends 
ClusterPerTestIntegrationTest {
         executeUpdate("DROP ZONE " + ZONE_NAME);
         HybridTimestamp tsAfterDrop = latestCatalogVersionTs(ignite0);
 
+        verifyPartitionNonMvDataExistsOnDisk(ignite0, replicationGroupId);
+
+        verifyZoneDistributionZoneManagerResourcesArePresent(ignite0, 
replicationGroupId.zoneId());
+
+        // Trigger txStateStorage vacuumization that will remove all records 
from the storage and thus make it eligible for removal.
+        
assertThat(ignite0.txManager().vacuum(mock(ResourceVacuumMetrics.class)), 
willCompleteSuccessfully());
+
         cluster.stopNode(0);
 
-        // Simulate a situation when an LWM was raised (and persisted) and the 
node was stopped immediately, so we were not
-        // able to destroy the dropped zone yet, even though its drop moment 
is already under the LWM.
-        raisePersistedLwm(workDir0, tsAfterDrop.tick());
+        if (moveLwmWhileNodeIsAbsent) {
+            // Simulate a situation when an LWM was raised (and persisted) and 
the node was stopped immediately, so we were not
+            // able to destroy the dropped zone yet, even though its drop 
moment is already under the LWM.
+            raisePersistedLwm(workDir0, tsAfterDrop.tick());
+        }
 
         IgniteImpl restartedIgnite0 = unwrapIgniteImpl(cluster.startNode(0));
 
         verifyPartitionGetsFullyRemovedFromDisk(restartedIgnite0, tableId, 
replicationGroupId);
+
+        verifyAssignmentKeysWereRemovedFromMetaStorage(restartedIgnite0, 
replicationGroupId);
+
+        
verifyZoneDistributionZoneManagerResourcesWereRemovedFromMetaStorage(restartedIgnite0,
 replicationGroupId.zoneId());
     }
 
     /**
@@ -371,7 +418,7 @@ class ItPartitionDestructionTest extends 
ClusterPerTestIntegrationTest {
         return table.zoneId();
     }
 
-    private static void makeSurePartitionExistsOnDisk(IgniteImpl ignite, int 
tableId, PartitionGroupId replicationGroupId) {
+    private static void makeSurePartitionExistsOnDisk(IgniteImpl ignite, int 
tableId, ZonePartitionId replicationGroupId) {
         makeSurePartitionMvDataExistsOnDisk(ignite, tableId);
 
         assertTrue(hasSomethingInTxStateStorage(ignite, replicationGroupId));
@@ -428,20 +475,20 @@ class ItPartitionDestructionTest extends 
ClusterPerTestIntegrationTest {
                 .array();
     }
 
-    private static File partitionRaftMetaFile(IgniteImpl ignite, 
ReplicationGroupId replicationGroupId) {
+    private static File partitionRaftMetaFile(IgniteImpl ignite, 
ZonePartitionId replicationGroupId) {
         String relativePath = partitionRaftNodeIdStringForStorage(ignite, 
replicationGroupId) + "/meta/raft_meta";
         Path raftMetaFilePath = 
ignite.partitionsWorkDir().metaPath().resolve(relativePath);
         return raftMetaFilePath.toFile();
     }
 
-    private static LogStorage partitionLogStorage(IgniteImpl ignite, 
ReplicationGroupId replicationGroupId) {
+    private static LogStorage partitionLogStorage(IgniteImpl ignite, 
ZonePartitionId replicationGroupId) {
         return ignite.partitionsLogStorageFactory().createLogStorage(
                 partitionRaftNodeIdStringForStorage(ignite, 
replicationGroupId),
                 new RaftOptions()
         );
     }
 
-    private static String partitionRaftNodeIdStringForStorage(IgniteImpl 
ignite, ReplicationGroupId replicationGroupId) {
+    private static String partitionRaftNodeIdStringForStorage(IgniteImpl 
ignite, ZonePartitionId replicationGroupId) {
         RaftNodeId partitionRaftNodeId = new RaftNodeId(replicationGroupId, 
new Peer(ignite.name()));
         return partitionRaftNodeId.nodeIdStringForStorage();
     }
@@ -453,7 +500,7 @@ class ItPartitionDestructionTest extends 
ClusterPerTestIntegrationTest {
     private static void verifyPartitionGetsFullyRemovedFromDisk(
             IgniteImpl ignite,
             int tableId,
-            PartitionGroupId replicationGroupId
+            ZonePartitionId replicationGroupId
     ) throws InterruptedException {
         verifyPartitionGetsRemovedFromDisk(ignite, tableId, 
replicationGroupId, true);
     }
@@ -461,7 +508,7 @@ class ItPartitionDestructionTest extends 
ClusterPerTestIntegrationTest {
     private static void verifyPartitionMvDataGetsRemovedFromDisk(
             IgniteImpl ignite,
             int tableId,
-            PartitionGroupId replicationGroupId
+            ZonePartitionId replicationGroupId
     ) throws InterruptedException {
         verifyPartitionGetsRemovedFromDisk(ignite, tableId, 
replicationGroupId, false);
     }
@@ -469,39 +516,63 @@ class ItPartitionDestructionTest extends 
ClusterPerTestIntegrationTest {
     private static void verifyPartitionGetsRemovedFromDisk(
             IgniteImpl ignite,
             int tableId,
-            PartitionGroupId replicationGroupId,
+            ZonePartitionId replicationGroupId,
             boolean concernNonMvData
     ) throws InterruptedException {
         File partitionFile = testTablePartition0File(ignite, tableId);
-        assertTrue(
-                waitForCondition(() -> !partitionFile.exists(), 
SECONDS.toMillis(10)),
-                "Partition file " + partitionFile.getAbsolutePath() + " was 
not removed in time"
-        );
+        await().atMost(10, SECONDS)
+                .untilAsserted(() -> {
+                    assertThat(
+                            "Partition file " + 
partitionFile.getAbsolutePath() + " was not removed in time.",
+                            !partitionFile.exists()
+                    );
+                });
 
         if (concernNonMvData) {
-            assertTrue(
-                    waitForCondition(() -> 
!hasSomethingInTxStateStorage(ignite, replicationGroupId), 
SECONDS.toMillis(10)),
-                    "Tx state storage was not destroyed in time"
-            );
-
-            assertTrue(
-                    waitForCondition(() -> partitionLogStorage(ignite, 
replicationGroupId).getLastLogIndex() == 0L, SECONDS.toMillis(10)),
-                    "Partition Raft log was not removed in time"
-            );
+            await().atMost(10, SECONDS)
+                    .untilAsserted(() -> {
+                        assertThat(
+                                "Tx state storage was not destroyed in time.",
+                                !hasSomethingInTxStateStorage(ignite, 
replicationGroupId)
+                        );
+                    });
+
+            await().atMost(10, SECONDS)
+                    .untilAsserted(() -> {
+                        assertThat(
+                                "Partition Raft log was not removed in time.",
+                                partitionLogStorage(ignite, 
replicationGroupId).getLastLogIndex(), is(0L)
+                        );
+                    });
 
             File raftMetaFile = partitionRaftMetaFile(ignite, 
replicationGroupId);
-            assertTrue(
-                    waitForCondition(() -> !raftMetaFile.exists(), 
SECONDS.toMillis(10)),
-                    "Partition Raft meta file " + 
raftMetaFile.getAbsolutePath() + " was not removed in time"
-            );
+            await().atMost(10, SECONDS)
+                    .untilAsserted(() -> {
+                        assertThat(
+                                "Partition Raft meta file " + 
raftMetaFile.getAbsolutePath() + " was not removed in time.",
+                                !raftMetaFile.exists()
+                        );
+                    });
         }
     }
 
+    private static void verifyPartitionNonMvDataExistsOnDisk(IgniteImpl 
ignite, ZonePartitionId replicationGroupId) {
+        assertTrue(hasSomethingInTxStateStorage(ignite, replicationGroupId), 
"Tx state storage was unexpectedly destroyed");
+
+        assertTrue(partitionLogStorage(ignite, 
replicationGroupId).getLastLogIndex() > 0L, "Partition Raft log was 
unexpectedly removed.");
+
+        File raftMetaFile = partitionRaftMetaFile(ignite, replicationGroupId);
+        assertTrue(raftMetaFile.exists(), "Partition Raft meta file " + 
raftMetaFile.getAbsolutePath() + " was unexpectedly removed.");
+    }
+
     private static void waitTillCatalogDoesNotContainTargetTable(IgniteImpl 
ignite, String tableName) throws InterruptedException {
-        assertTrue(
-                waitForCondition(() -> 
!catalogHistoryContainsTargetTable(ignite, tableName), SECONDS.toMillis(10)),
-                "Did not observe catalog truncation in time"
-        );
+        await().atMost(10, SECONDS)
+                .untilAsserted(() -> {
+                    assertThat(
+                            "Did not observe catalog truncation in time.",
+                            !catalogHistoryContainsTargetTable(ignite, 
tableName)
+                    );
+                });
     }
 
     private static boolean catalogHistoryContainsTargetTable(IgniteImpl 
ignite, String tableName) {
@@ -550,15 +621,18 @@ class ItPartitionDestructionTest extends 
ClusterPerTestIntegrationTest {
         verifyPartitionGetsFullyRemovedFromDisk(notHostingIgnite, tableId, 
replicationGroupId);
     }
 
-    private void waitTillAssignmentCountReaches(int targetAssignmentCount, 
ReplicationGroupId replicationGroupId)
+    private void waitTillAssignmentCountReaches(int targetAssignmentCount, 
ZonePartitionId replicationGroupId)
             throws InterruptedException {
-        assertTrue(
-                waitForCondition(() -> 
partitionAssignments(replicationGroupId).size() == targetAssignmentCount, 
SECONDS.toMillis(10)),
-                "Did not see assignments count reaching " + 
targetAssignmentCount
-        );
+        await().atMost(10, SECONDS)
+                .untilAsserted(() -> {
+                    assertThat(
+                            "Did not see assignments count reaching.",
+                            partitionAssignments(replicationGroupId).size() == 
targetAssignmentCount
+                    );
+                });
     }
 
-    private Set<Assignment> partitionAssignments(ReplicationGroupId 
replicationGroupId) {
+    private Set<Assignment> partitionAssignments(ZonePartitionId 
replicationGroupId) {
         IgniteImpl ignite = unwrapIgniteImpl(cluster.aliveNode());
 
         CompletableFuture<TokenizedAssignments> assignmentsFuture = 
ignite.placementDriver()
@@ -571,7 +645,7 @@ class ItPartitionDestructionTest extends 
ClusterPerTestIntegrationTest {
         return assignments.nodes();
     }
 
-    private IgniteImpl nodeNotHostingPartition(ReplicationGroupId 
replicationGroupId) throws InterruptedException {
+    private IgniteImpl nodeNotHostingPartition(ZonePartitionId 
replicationGroupId) throws InterruptedException {
         waitTillAssignmentCountReaches(1, replicationGroupId);
 
         Set<Assignment> assignments = partitionAssignments(replicationGroupId);
@@ -584,4 +658,129 @@ class ItPartitionDestructionTest extends 
ClusterPerTestIntegrationTest {
                 .map(TestWrappers::unwrapIgniteImpl)
                 .orElseThrow();
     }
+
+    private static void 
verifyAssignmentKeysWereRemovedFromMetaStorage(IgniteImpl ignite, 
ZonePartitionId zonePartitionId)
+            throws InterruptedException {
+        MetaStorageManager metaStorage = 
unwrapIgniteImpl(ignite).metaStorageManager();
+
+        await().atMost(10, SECONDS)
+                .untilAsserted(() -> {
+                    Entry entry = 
metaStorage.getLocally(stablePartAssignmentsKey(zonePartitionId));
+                    assertThat(
+                            "Stable assignments were not removed from meta 
storage in time.",
+                            entry.tombstone() || entry.empty()
+                    );
+                });
+
+        await().atMost(10, SECONDS)
+                .untilAsserted(() -> {
+                    Entry entry = 
metaStorage.getLocally(pendingPartAssignmentsQueueKey(zonePartitionId));
+                    assertThat(
+                            "Pending assignments were not removed from meta 
storage in time.",
+                            entry.tombstone() || entry.empty()
+                    );
+                });
+
+        await().atMost(10, SECONDS)
+                .untilAsserted(() -> {
+                    Entry entry = 
metaStorage.getLocally(pendingChangeTriggerKey(zonePartitionId));
+                    assertThat(
+                            "Pending change trigger key was not removed from 
meta storage in time.",
+                            entry.tombstone() || entry.empty()
+                    );
+                });
+
+        await().atMost(10, SECONDS)
+                .untilAsserted(() -> {
+                    Entry entry = 
metaStorage.getLocally(plannedPartAssignmentsKey(zonePartitionId));
+                    assertThat(
+                            "Planned assignments were not removed from meta 
storage in time.",
+                            entry.tombstone() || entry.empty()
+                    );
+                });
+
+        await().atMost(10, SECONDS)
+                .untilAsserted(() -> {
+                    Entry entry = 
metaStorage.getLocally(switchAppendKey(zonePartitionId));
+                    assertThat(
+                            "Switch append assignments were not removed from 
meta storage in time.",
+                            entry.tombstone() || entry.empty()
+                    );
+                });
+
+        await().atMost(10, SECONDS)
+                .untilAsserted(() -> {
+                    Entry entry = 
metaStorage.getLocally(switchReduceKey(zonePartitionId));
+                    assertThat(
+                            "Switch reduce assignments were not removed from 
meta storage in time.",
+                            entry.tombstone() || entry.empty()
+                    );
+                });
+
+        await().atMost(10, SECONDS)
+                .untilAsserted(() -> {
+                    Entry entry = 
metaStorage.getLocally(assignmentsChainKey(zonePartitionId));
+                    assertThat(
+                            "Assignments chain was not removed from meta 
storage in time.",
+                            entry.tombstone() || entry.empty()
+                    );
+                });
+    }
+
+    private static void 
verifyZoneDistributionZoneManagerResourcesArePresent(IgniteImpl ignite, int 
zoneId) throws InterruptedException {
+        MetaStorageManager metaStorage = 
unwrapIgniteImpl(ignite).metaStorageManager();
+
+        await().atMost(10, SECONDS)
+                .untilAsserted(() -> {
+                    assertThat(
+                            "Zone data nodes are not present in meta storage.",
+                            
!metaStorage.getLocally(zoneDataNodesHistoryKey(zoneId)).empty()
+                    );
+                });
+
+        await().atMost(10, SECONDS)
+                .untilAsserted(() -> {
+                    assertThat(
+                            "Zone scale up timer is not present in meta 
storage.",
+                            
!metaStorage.getLocally(zoneScaleUpTimerKey(zoneId)).empty()
+                    );
+                });
+
+        await().atMost(10, SECONDS)
+                .untilAsserted(() -> {
+                    assertThat(
+                            "Zone scale down timer is not present in meta 
storage.",
+                            
!metaStorage.getLocally(zoneScaleDownTimerKey(zoneId)).empty()
+                    );
+                });
+    }
+
+    private static void 
verifyZoneDistributionZoneManagerResourcesWereRemovedFromMetaStorage(IgniteImpl 
ignite, int zoneId)
+            throws InterruptedException {
+        MetaStorageManager metaStorage = 
unwrapIgniteImpl(ignite).metaStorageManager();
+
+        await().atMost(10, SECONDS)
+                .untilAsserted(() -> {
+                    assertThat(
+                            "Zone data nodes were not removed from meta 
storage in time.",
+                            
metaStorage.getLocally(zoneDataNodesHistoryKey(zoneId)).tombstone()
+                    );
+                });
+
+        await().atMost(10, SECONDS)
+                .untilAsserted(() -> {
+                    assertThat(
+                            "Zone scale up timer was not removed from meta 
storage in time.",
+                            
metaStorage.getLocally(zoneScaleUpTimerKey(zoneId)).tombstone()
+                    );
+                });
+
+        await().atMost(10, SECONDS)
+                .untilAsserted(() -> {
+                    assertThat(
+                            "Zone scale down timer was not removed from meta 
storage in time.",
+                            
metaStorage.getLocally(zoneScaleDownTimerKey(zoneId)).tombstone()
+                    );
+                });
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 6cb27eb4729..2743476c67e 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -28,9 +28,7 @@ import static 
java.util.concurrent.CompletableFuture.supplyAsync;
 import static java.util.function.Function.identity;
 import static java.util.stream.Collectors.groupingBy;
 import static java.util.stream.Collectors.toList;
-import static java.util.stream.Collectors.toSet;
 import static 
org.apache.ignite.internal.causality.IncrementalVersionedValue.dependingOn;
-import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
 import static org.apache.ignite.internal.event.EventListener.fromConsumer;
 import static 
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEvent.AFTER_REPLICA_DESTROYED;
 import static 
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEvent.AFTER_REPLICA_STOPPED;
@@ -77,7 +75,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Function;
 import java.util.function.Supplier;
-import java.util.stream.IntStream;
 import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.CatalogService;
 import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
@@ -105,7 +102,6 @@ import org.apache.ignite.internal.failure.FailureProcessor;
 import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.hlc.HybridTimestampTracker;
-import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.logger.IgniteLogger;
@@ -158,7 +154,6 @@ import 
org.apache.ignite.internal.storage.engine.StorageTableDescriptor;
 import 
org.apache.ignite.internal.storage.metrics.StorageEngineTablesMetricSource;
 import org.apache.ignite.internal.table.IgniteTablesInternal;
 import org.apache.ignite.internal.table.InternalTable;
-import org.apache.ignite.internal.table.LongPriorityQueue;
 import org.apache.ignite.internal.table.StreamerReceiverRunner;
 import org.apache.ignite.internal.table.TableImpl;
 import org.apache.ignite.internal.table.TableViewInternal;
@@ -190,6 +185,7 @@ import org.apache.ignite.internal.util.CompletableFutures;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.Lazy;
+import org.apache.ignite.internal.util.LongPriorityQueue;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.sql.IgniteSql;
@@ -978,6 +974,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
         }
     }
 
+    // TODO https://issues.apache.org/jira/browse/IGNITE-27468 Not 
"thread-safe" in case of concurrent disaster recovery or rebalances.
     private CompletableFuture<Boolean> 
onLwmChanged(ChangeLowWatermarkEventParameters parameters) {
         if (!busyLock.enterBusy()) {
             return falseCompletedFuture();
@@ -1255,20 +1252,7 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
 
         InternalTable internalTable = table.internalTable();
 
-        if (!nodeProperties.colocationEnabled()) {
-            Set<ByteArray> assignmentKeys = IntStream.range(0, 
internalTable.partitions())
-                    .mapToObj(p -> stablePartAssignmentsKey(new 
TablePartitionId(tableId, p)))
-                    .collect(toSet());
-
-            metaStorageMgr.removeAll(assignmentKeys)
-                    .whenComplete((v, e) -> {
-                        if (e != null) {
-                            LOG.error("Failed to remove assignments from 
metastorage [tableId={}]", e, tableId);
-                        }
-                    });
-        }
-
-        return stopAndDestroyTablePartitions(table)
+        return stopAndDestroyTableProcessors(table)
                 .thenComposeAsync(unused -> inBusyLockAsync(busyLock, () -> 
internalTable.storage().destroy()), ioExecutor)
                 .thenAccept(unused -> inBusyLock(busyLock, () -> {
                     tables.remove(tableId);
@@ -1534,7 +1518,7 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         return CompletableFutures.allOf(storageFuts);
     }
 
-    private CompletableFuture<Void> 
stopAndDestroyTablePartitions(TableViewInternal table) {
+    private CompletableFuture<Void> 
stopAndDestroyTableProcessors(TableViewInternal table) {
         InternalTable internalTable = table.internalTable();
 
         int partitions = internalTable.partitions();
@@ -1549,20 +1533,13 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
             return writeLockAcquisitionFuture.thenCompose(stamp -> {
                 CompletableFuture<?>[] stopReplicaAndDestroyFutures = new 
CompletableFuture<?>[partitions];
 
-                // TODO https://issues.apache.org/jira/browse/IGNITE-24345
-                //  Partitions should be stopped on the assignments change 
event triggered by zone drop or alter.
-                //  Stop replica asynchronously, out of metastorage event 
pipeline.
                 for (int partitionId = 0; partitionId < partitions; 
partitionId++) {
                     CompletableFuture<Void> resourcesUnloadFuture;
 
-                    if (nodeProperties.colocationEnabled()) {
-                        resourcesUnloadFuture = 
partitionReplicaLifecycleManager.unloadTableResourcesFromZoneReplica(
-                                new ZonePartitionId(internalTable.zoneId(), 
partitionId),
-                                internalTable.tableId()
-                        );
-                    } else {
-                        resourcesUnloadFuture = nullCompletedFuture();
-                    }
+                    resourcesUnloadFuture = 
partitionReplicaLifecycleManager.unloadTableResourcesFromZoneReplica(
+                            new ZonePartitionId(internalTable.zoneId(), 
partitionId),
+                            internalTable.tableId()
+                    );
 
                     var tablePartitionId = new 
TablePartitionId(internalTable.tableId(), partitionId);
 
@@ -1884,16 +1861,11 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
     }
 
     private void cleanUpResourcesForDroppedTablesOnRecoveryBusy(@Nullable 
HybridTimestamp lwm) {
-        // TODO: IGNITE-20384 Clean up abandoned resources for dropped zones 
from vault and metastore
+        // TODO: IGNITE-20384 Clean up abandoned resources for dropped tables 
from vault and metastore
 
         Set<Integer> aliveTableIds = aliveTables(catalogService, lwm);
 
         destroyMvStoragesForTablesNotIn(aliveTableIds);
-
-        if (!nodeProperties.colocationEnabled()) {
-            destroyTxStateStoragesForTablesNotIn(aliveTableIds);
-            destroyReplicationProtocolStoragesForTablesNotIn(aliveTableIds);
-        }
     }
 
     private void destroyMvStoragesForTablesNotIn(Set<Integer> aliveTableIds) {

Reply via email to