This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new d7d0f741e3 IGNITE-18529 Deal with deleting a table/partition and 
rebalancing partitions (#1745)
d7d0f741e3 is described below

commit d7d0f741e35c85fc63c28466c4caad93d160e9e1
Author: Kirill Tkalenko <tkalkir...@yandex.ru>
AuthorDate: Tue Mar 7 11:42:54 2023 +0300

    IGNITE-18529 Deal with deleting a table/partition and rebalancing 
partitions (#1745)
---
 docs/_docs/glossary/glossary.adoc                  |   2 +-
 .../internal/storage/util/MvPartitionStorages.java | 158 +++++++--------------
 .../internal/storage/util/StorageOperation.java    |  87 ++++++++++--
 .../storage/util/MvPartitionStoragesTest.java      | 154 +++++++++++++++-----
 .../storage/AbstractMvTableStorageTest.java        |  18 +++
 .../storage/impl/TestMvPartitionStorage.java       |   2 -
 .../internal/storage/impl/TestMvTableStorage.java  |   6 +-
 .../pagememory/AbstractPageMemoryTableStorage.java |  14 +-
 .../mv/AbstractPageMemoryMvPartitionStorage.java   |   6 +-
 .../storage/rocksdb/RocksDbMvPartitionStorage.java |   6 +-
 .../storage/rocksdb/RocksDbTableStorage.java       |  27 ++--
 11 files changed, 304 insertions(+), 176 deletions(-)

diff --git a/docs/_docs/glossary/glossary.adoc 
b/docs/_docs/glossary/glossary.adoc
index b43156752d..d3c5393817 100644
--- a/docs/_docs/glossary/glossary.adoc
+++ b/docs/_docs/glossary/glossary.adoc
@@ -22,7 +22,7 @@ Cluster Management Group::A subset of Ignite nodes in a Raft 
cluster. Cluster gr
 
 Data Region:: Data regions are used to control the amount of memory available 
to the storage. Depending on the type of storage the data region is assigned 
to, the data may be loaded into RAM or stored
 
-Data Rebalace:: Data rebalance is the process of redistributing partitions to 
make sure they are distributed equally across all nodes in the cluster.
+Data Rebalance:: Data rebalance is the process of redistributing partitions to 
make sure they are distributed equally across all nodes in the cluster.
 
 ==== M
 
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/MvPartitionStorages.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/MvPartitionStorages.java
index 8f8ce9fcbe..cfb9d6446e 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/MvPartitionStorages.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/MvPartitionStorages.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.StorageRebalanceException;
 import 
org.apache.ignite.internal.storage.util.StorageOperation.AbortRebalanceStorageOperation;
 import 
org.apache.ignite.internal.storage.util.StorageOperation.CleanupStorageOperation;
+import 
org.apache.ignite.internal.storage.util.StorageOperation.CloseStorageOperation;
 import 
org.apache.ignite.internal.storage.util.StorageOperation.CreateStorageOperation;
 import 
org.apache.ignite.internal.storage.util.StorageOperation.DestroyStorageOperation;
 import 
org.apache.ignite.internal.storage.util.StorageOperation.FinishRebalanceStorageOperation;
@@ -54,7 +55,7 @@ public class MvPartitionStorages<T extends 
MvPartitionStorage> {
 
     private final ConcurrentMap<Integer, StorageOperation> 
operationByPartitionId = new ConcurrentHashMap<>();
 
-    private final ConcurrentMap<Integer, CompletableFuture<Void>> 
rebalaceFutureByPartitionId = new ConcurrentHashMap<>();
+    private final ConcurrentMap<Integer, CompletableFuture<Void>> 
rebalanceFutureByPartitionId = new ConcurrentHashMap<>();
 
     /**
      * Constructor.
@@ -127,7 +128,7 @@ public class MvPartitionStorages<T extends 
MvPartitionStorage> {
         }).whenComplete((storage, throwable) -> 
operationByPartitionId.compute(partitionId, (partId, operation) -> {
             assert operation instanceof CreateStorageOperation : 
createStorageInfo(partitionId) + ", op=" + operation;
 
-            return null;
+            return completeOperation(operation);
         }));
     }
 
@@ -158,9 +159,7 @@ public class MvPartitionStorages<T extends 
MvPartitionStorage> {
                     operationByPartitionId.compute(partitionId, (partId, 
operation) -> {
                         assert operation instanceof DestroyStorageOperation : 
createStorageInfo(partitionId) + ", op=" + operation;
 
-                        DestroyStorageOperation destroyStorageOperation = 
(DestroyStorageOperation) operation;
-
-                        return 
destroyStorageOperation.getCreateStorageOperation();
+                        return completeOperation(operation);
                     });
 
                     if (throwable == null) {
@@ -198,7 +197,7 @@ public class MvPartitionStorages<T extends 
MvPartitionStorage> {
                         operationByPartitionId.compute(partitionId, (partId, 
operation) -> {
                             assert operation instanceof 
CleanupStorageOperation : createStorageInfo(partitionId) + ", op=" + operation;
 
-                            return null;
+                            return completeOperation(operation);
                         })
                 );
     }
@@ -213,7 +212,7 @@ public class MvPartitionStorages<T extends 
MvPartitionStorage> {
      * @throws StorageRebalanceException If the storage does not exist or 
another operation is already in progress.
      * @throws StorageRebalanceException If rebalancing is already in progress.
      */
-    public CompletableFuture<Void> startRebalace(int partitionId, Function<T, 
CompletableFuture<Void>> startRebalanceStorageFunction) {
+    public CompletableFuture<Void> startRebalance(int partitionId, Function<T, 
CompletableFuture<Void>> startRebalanceStorageFunction) {
         operationByPartitionId.compute(partitionId, (partId, operation) -> {
             checkStorageExistsForRebalance(partitionId);
 
@@ -221,7 +220,7 @@ public class MvPartitionStorages<T extends 
MvPartitionStorage> {
                 throwExceptionDependingOnOperationForRebalance(operation, 
partitionId);
             }
 
-            if (rebalaceFutureByPartitionId.containsKey(partitionId)) {
+            if (rebalanceFutureByPartitionId.containsKey(partitionId)) {
                 throw new 
StorageRebalanceException(createStorageInProgressOfRebalanceErrorMessage(partitionId));
             }
 
@@ -232,7 +231,7 @@ public class MvPartitionStorages<T extends 
MvPartitionStorage> {
                 .thenCompose(unused -> {
                     CompletableFuture<Void> startRebalanceFuture = 
startRebalanceStorageFunction.apply(get(partitionId));
 
-                    CompletableFuture<Void> old = 
rebalaceFutureByPartitionId.put(partitionId, startRebalanceFuture);
+                    CompletableFuture<Void> old = 
rebalanceFutureByPartitionId.put(partitionId, startRebalanceFuture);
 
                     assert old == null : createStorageInfo(partitionId);
 
@@ -242,7 +241,7 @@ public class MvPartitionStorages<T extends 
MvPartitionStorage> {
                             assert operation instanceof 
StartRebalanceStorageOperation :
                                     createStorageInfo(partitionId) + ", op=" + 
operation;
 
-                            return null;
+                            return completeOperation(operation);
                         })
                 );
     }
@@ -269,7 +268,7 @@ public class MvPartitionStorages<T extends 
MvPartitionStorage> {
 
         return completedFuture(null)
                 .thenCompose(unused -> {
-                    CompletableFuture<Void> rebalanceFuture = 
rebalaceFutureByPartitionId.remove(partitionId);
+                    CompletableFuture<Void> rebalanceFuture = 
rebalanceFutureByPartitionId.remove(partitionId);
 
                     if (rebalanceFuture == null) {
                         return completedFuture(null);
@@ -283,7 +282,7 @@ public class MvPartitionStorages<T extends 
MvPartitionStorage> {
                             assert operation instanceof 
AbortRebalanceStorageOperation :
                                     createStorageInfo(partitionId) + ", op=" + 
operation;
 
-                            return null;
+                            return completeOperation(operation);
                         })
                 );
     }
@@ -306,7 +305,7 @@ public class MvPartitionStorages<T extends 
MvPartitionStorage> {
                 throwExceptionDependingOnOperationForRebalance(operation, 
partitionId);
             }
 
-            if (!rebalaceFutureByPartitionId.containsKey(partitionId)) {
+            if (!rebalanceFutureByPartitionId.containsKey(partitionId)) {
                 throw new StorageRebalanceException("Storage rebalancing did 
not start: [" + createStorageInfo(partitionId) + ']');
             }
 
@@ -315,7 +314,7 @@ public class MvPartitionStorages<T extends 
MvPartitionStorage> {
 
         return completedFuture(null)
                 .thenCompose(unused -> {
-                    CompletableFuture<Void> rebalanceFuture = 
rebalaceFutureByPartitionId.remove(partitionId);
+                    CompletableFuture<Void> rebalanceFuture = 
rebalanceFutureByPartitionId.remove(partitionId);
 
                     assert rebalanceFuture != null : 
createStorageInfo(partitionId);
 
@@ -325,49 +324,11 @@ public class MvPartitionStorages<T extends 
MvPartitionStorage> {
                             assert operation instanceof 
FinishRebalanceStorageOperation :
                                     createStorageInfo(partitionId) + ", op=" + 
operation;
 
-                            return null;
+                            return completeOperation(operation);
                         })
                 );
     }
 
-    /**
-     * Collects all multi-versioned partition storages to close.
-     */
-    // TODO: IGNITE-18529 We need to wait for all current operations and 
disable new ones
-    public List<T> getAllForClose() {
-        return IntStream.range(0, storageByPartitionId.length())
-                .mapToObj(partitionId -> 
storageByPartitionId.getAndSet(partitionId, null))
-                .filter(Objects::nonNull)
-                .collect(toList());
-    }
-
-    /**
-     * Destroys all created multi-versioned partition storages.
-     *
-     * @param destroyStorageFunction Partition destruction function.
-     * @return Future destruction of all created multi-versioned partition 
storages.
-     */
-    // TODO: IGNITE-18529 We need to deal with parallel operations
-    public CompletableFuture<Void> destroyAll(Function<T, 
CompletableFuture<Void>> destroyStorageFunction) {
-        List<CompletableFuture<Void>> destroyFutures = new ArrayList<>();
-
-        for (int partitionId = 0; partitionId < storageByPartitionId.length(); 
partitionId++) {
-            StorageOperation storageOperation = 
operationByPartitionId.get(partitionId);
-
-            if (storageOperation instanceof DestroyStorageOperation) {
-                destroyFutures.add(((DestroyStorageOperation) 
storageOperation).getDestroyFuture());
-            } else {
-                T storage = storageByPartitionId.getAndSet(partitionId, null);
-
-                if (storage != null) {
-                    destroyFutures.add(destroyStorageFunction.apply(storage));
-                }
-            }
-        }
-
-        return 
CompletableFuture.allOf(destroyFutures.toArray(CompletableFuture[]::new));
-    }
-
     /**
      * Returns table name.
      */
@@ -430,74 +391,63 @@ public class MvPartitionStorages<T extends 
MvPartitionStorage> {
     }
 
     private void throwExceptionDependingOnOperation(StorageOperation 
operation, int partitionId) {
-        if (operation instanceof CreateStorageOperation) {
-            throw new 
StorageException(createStorageInProgressOfCreationErrorMessage(partitionId));
-        } else if (operation instanceof DestroyStorageOperation) {
-            throw new 
StorageException(createStorageInProgressOfDestructionErrorMessage(partitionId));
-        } else if (operation instanceof StartRebalanceStorageOperation) {
-            throw new 
StorageRebalanceException(createStorageInProgressOfStartRebalanceErrorMessage(partitionId));
-        } else if (operation instanceof AbortRebalanceStorageOperation) {
-            throw new 
StorageRebalanceException(createStorageInProgressOfAbortRebalanceErrorMessage(partitionId));
-        } else if (operation instanceof FinishRebalanceStorageOperation) {
-            throw new 
StorageRebalanceException(createStorageInProgressOfFinishRebalanceErrorMessage(partitionId));
-        } else if (operation instanceof CleanupStorageOperation) {
-            throw new 
StorageException(createStorageInProgressOfCleanupErrorMessage(partitionId));
-        } else {
-            throw new 
StorageException(createUnknownOperationErrorMessage(partitionId, operation));
-        }
+        throw new 
StorageException(operation.inProcessErrorMessage(createStorageInfo(partitionId)));
     }
 
     private void 
throwExceptionDependingOnOperationForRebalance(StorageOperation operation, int 
partitionId) {
-        if (operation instanceof CreateStorageOperation) {
-            throw new 
StorageRebalanceException(createStorageInProgressOfCreationErrorMessage(partitionId));
-        } else if (operation instanceof DestroyStorageOperation) {
-            throw new 
StorageRebalanceException(createStorageInProgressOfDestructionErrorMessage(partitionId));
-        } else if (operation instanceof StartRebalanceStorageOperation) {
-            throw new 
StorageRebalanceException(createStorageInProgressOfStartRebalanceErrorMessage(partitionId));
-        } else if (operation instanceof AbortRebalanceStorageOperation) {
-            throw new 
StorageRebalanceException(createStorageInProgressOfAbortRebalanceErrorMessage(partitionId));
-        } else if (operation instanceof FinishRebalanceStorageOperation) {
-            throw new 
StorageRebalanceException(createStorageInProgressOfFinishRebalanceErrorMessage(partitionId));
-        } else if (operation instanceof CleanupStorageOperation) {
-            throw new 
StorageRebalanceException(createStorageInProgressOfCleanupErrorMessage(partitionId));
-        } else {
-            throw new 
StorageRebalanceException(createUnknownOperationErrorMessage(partitionId, 
operation));
-        }
+        throw new 
StorageRebalanceException(operation.inProcessErrorMessage(createStorageInfo(partitionId)));
     }
 
     private String createStorageDoesNotExistErrorMessage(int partitionId) {
         return "Storage does not exist: [" + createStorageInfo(partitionId) + 
']';
     }
 
-    private String createStorageInProgressOfCreationErrorMessage(int 
partitionId) {
-        return "Storage is in process of being created: [" + 
createStorageInfo(partitionId) + ']';
+    private String createStorageInProgressOfRebalanceErrorMessage(int 
partitionId) {
+        return "Storage in the process of rebalance: [" + 
createStorageInfo(partitionId) + ']';
     }
 
-    private String createStorageInProgressOfDestructionErrorMessage(int 
partitionId) {
-        return "Storage is already in process of being destroyed: [" + 
createStorageInfo(partitionId) + ']';
-    }
+    private static @Nullable StorageOperation 
completeOperation(StorageOperation operation) {
+        operation.operationFuture().complete(null);
 
-    private String createStorageInProgressOfStartRebalanceErrorMessage(int 
partitionId) {
-        return "Storage in the process of starting a rebalance: [" + 
createStorageInfo(partitionId) + ']';
-    }
+        if (operation.isFinalOperation()) {
+            return operation;
+        }
 
-    private String createStorageInProgressOfAbortRebalanceErrorMessage(int 
partitionId) {
-        return "Storage in the process of aborting a rebalance: [" + 
createStorageInfo(partitionId) + ']';
+        return operation instanceof DestroyStorageOperation ? 
((DestroyStorageOperation) operation).getCreateStorageOperation() : null;
     }
 
-    private String createStorageInProgressOfFinishRebalanceErrorMessage(int 
partitionId) {
-        return "Storage in the process of finishing a rebalance: [" + 
createStorageInfo(partitionId) + ']';
-    }
+    /**
+     * Returns all storages for closing or destroying after completion of 
operations for all storages.
+     *
+     * <p>After completing the future, when try to perform any operation, 
{@link StorageException} for all storages will be thrown.
+     *
+     * @return Future that at the complete will return all the storages that 
are not destroyed.
+     */
+    public CompletableFuture<List<T>> getAllForCloseOrDestroy() {
+        List<CompletableFuture<Void>> operationFutures = new ArrayList<>();
 
-    private String createStorageInProgressOfRebalanceErrorMessage(int 
partitionId) {
-        return "Storage in the process of rebalance: [" + 
createStorageInfo(partitionId) + ']';
-    }
+        for (int partitionId = 0; partitionId < storageByPartitionId.length(); 
partitionId++) {
+            StorageOperation storageOperation = 
operationByPartitionId.compute(partitionId, (partId, operation) -> {
+                if (operation == null) {
+                    operation = new CloseStorageOperation();
+                }
 
-    private String createStorageInProgressOfCleanupErrorMessage(int 
partitionId) {
-        return "Storage is in process of being cleaned up: [" + 
createStorageInfo(partitionId) + ']';
-    }
+                operation.markFinalOperation();
+
+                return operation;
+            });
 
-    private String createUnknownOperationErrorMessage(int partitionId, 
StorageOperation operation) {
-        return "Unknown operation: [" + createStorageInfo(partitionId) + ", 
operation=" + operation + ']';
+            if (!(storageOperation instanceof CloseStorageOperation)) {
+                operationFutures.add(storageOperation.operationFuture());
+            }
+        }
+
+        return 
CompletableFuture.allOf(operationFutures.toArray(CompletableFuture[]::new))
+                .thenApply(unused ->
+                        IntStream.range(0, storageByPartitionId.length())
+                                .mapToObj(partitionId -> 
storageByPartitionId.getAndSet(partitionId, null))
+                                .filter(Objects::nonNull)
+                                .collect(toList())
+                );
     }
 }
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageOperation.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageOperation.java
index 09b8156cde..81c1a57e45 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageOperation.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/StorageOperation.java
@@ -24,17 +24,53 @@ import org.jetbrains.annotations.Nullable;
 /**
  * Storage operations.
  */
-interface StorageOperation {
+abstract class StorageOperation {
+    private final CompletableFuture<Void> operationFuture = new 
CompletableFuture<>();
+
+    private volatile boolean finalOperation;
+
+    /**
+     * Returns future completion of the operation.
+     */
+    CompletableFuture<Void> operationFuture() {
+        return operationFuture;
+    }
+
+    /**
+     * Return {@code true} if the operation is the final.
+     */
+    boolean isFinalOperation() {
+        return finalOperation;
+    }
+
+    /**
+     * Marks the operation as final.
+     */
+    void markFinalOperation() {
+        finalOperation = true;
+    }
+
+    /**
+     * Creates an error message indicating that the current operation is in 
progress or closed.
+     *
+     * @param storageInfo Storage information in the format "table=user, 
partitionId=1".
+     */
+    abstract String inProcessErrorMessage(String storageInfo);
+
     /**
      * Storage creation operation.
      */
-    class CreateStorageOperation implements StorageOperation {
+    static class CreateStorageOperation extends StorageOperation {
+        @Override
+        String inProcessErrorMessage(String storageInfo) {
+            return "Storage is in process of being created: [" + storageInfo + 
']';
+        }
     }
 
     /**
      * Storage destruction operation.
      */
-    class DestroyStorageOperation implements StorageOperation {
+    static class DestroyStorageOperation extends StorageOperation {
         private final CompletableFuture<Void> destroyFuture = new 
CompletableFuture<>();
 
         private final AtomicReference<CreateStorageOperation> 
createStorageOperationReference = new AtomicReference<>();
@@ -45,46 +81,77 @@ interface StorageOperation {
          * @param createStorageOperation Storage creation operation.
          * @return {@code True} if the operation was set by current method 
invocation, {@code false} if by another method invocation.
          */
-        public boolean setCreationOperation(CreateStorageOperation 
createStorageOperation) {
+        boolean setCreationOperation(CreateStorageOperation 
createStorageOperation) {
             return createStorageOperationReference.compareAndSet(null, 
createStorageOperation);
         }
 
         /**
          * Returns {@link #setCreationOperation(CreateStorageOperation) set} a 
storage creation operation.
          */
-        public @Nullable CreateStorageOperation getCreateStorageOperation() {
+        @Nullable CreateStorageOperation getCreateStorageOperation() {
             return createStorageOperationReference.get();
         }
 
         /**
          * Returns the storage destruction future.
          */
-        public CompletableFuture<Void> getDestroyFuture() {
+        CompletableFuture<Void> getDestroyFuture() {
             return destroyFuture;
         }
+
+        @Override
+        String inProcessErrorMessage(String storageInfo) {
+            return "Storage is already in process of being destroyed: [" + 
storageInfo + ']';
+        }
     }
 
     /**
      * Storage rebalancing start operation.
      */
-    class StartRebalanceStorageOperation implements StorageOperation {
+    static class StartRebalanceStorageOperation extends StorageOperation {
+        @Override
+        String inProcessErrorMessage(String storageInfo) {
+            return "Storage in the process of starting a rebalance: [" + 
storageInfo + ']';
+        }
     }
 
     /**
      * Storage rebalancing abort operation.
      */
-    class AbortRebalanceStorageOperation implements StorageOperation {
+    static class AbortRebalanceStorageOperation extends StorageOperation {
+        @Override
+        String inProcessErrorMessage(String storageInfo) {
+            return "Storage in the process of aborting a rebalance: [" + 
storageInfo + ']';
+        }
     }
 
     /**
      * Storage rebalancing finish operation.
      */
-    class FinishRebalanceStorageOperation implements StorageOperation {
+    static class FinishRebalanceStorageOperation extends StorageOperation {
+        @Override
+        String inProcessErrorMessage(String storageInfo) {
+            return "Storage in the process of finishing a rebalance: [" + 
storageInfo + ']';
+        }
     }
 
     /**
      * Storage cleanup operation.
      */
-    class CleanupStorageOperation implements StorageOperation {
+    static class CleanupStorageOperation extends StorageOperation {
+        @Override
+        String inProcessErrorMessage(String storageInfo) {
+            return "Storage is in process of being cleaned up: [" + 
storageInfo + ']';
+        }
+    }
+
+    /**
+     * Storage close operation.
+     */
+    static class CloseStorageOperation extends StorageOperation {
+        @Override
+        String inProcessErrorMessage(String storageInfo) {
+            return "Storage is in the process of closing: [" + storageInfo + 
']';
+        }
     }
 }
diff --git 
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/util/MvPartitionStoragesTest.java
 
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/util/MvPartitionStoragesTest.java
index aed8c7d2e5..345c070f1e 100644
--- 
a/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/util/MvPartitionStoragesTest.java
+++ 
b/modules/storage-api/src/test/java/org/apache/ignite/internal/storage/util/MvPartitionStoragesTest.java
@@ -24,7 +24,7 @@ import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutur
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willFailFast;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willTimeoutFast;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.containsString;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -35,6 +35,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 import 
org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
@@ -53,7 +54,7 @@ import org.junit.jupiter.api.function.Executable;
  */
 @ExtendWith(ConfigurationExtension.class)
 public class MvPartitionStoragesTest {
-    @InjectConfiguration
+    @InjectConfiguration("mock.partitions = 10")
     private TableConfiguration tableConfig;
 
     private MvPartitionStorages<MvPartitionStorage> mvPartitionStorages;
@@ -312,7 +313,7 @@ public class MvPartitionStoragesTest {
         CompletableFuture<Void> finishStartRebalanceMvStorage = new 
CompletableFuture<>();
 
         CompletableFuture<?> startRebalanceFuture = runAsync(() ->
-                assertThat(mvPartitionStorages.startRebalace(0, mvStorage -> {
+                assertThat(mvPartitionStorages.startRebalance(0, mvStorage -> {
                     startStartRebalanceMvStorage.complete(null);
 
                     return finishStartRebalanceMvStorage;
@@ -354,7 +355,7 @@ public class MvPartitionStoragesTest {
         // What if there is an error during the operation?
 
         assertThat(
-                mvPartitionStorages.startRebalace(0, mvStorage -> 
failedFuture(new RuntimeException("from test"))),
+                mvPartitionStorages.startRebalance(0, mvStorage -> 
failedFuture(new RuntimeException("from test"))),
                 willFailFast(RuntimeException.class)
         );
 
@@ -421,7 +422,7 @@ public class MvPartitionStoragesTest {
         invokeAbortFunction.set(false);
 
         assertThat(
-                mvPartitionStorages.startRebalace(0, mvStorage -> 
failedFuture(new RuntimeException("from test"))),
+                mvPartitionStorages.startRebalance(0, mvStorage -> 
failedFuture(new RuntimeException("from test"))),
                 willFailFast(RuntimeException.class)
         );
 
@@ -512,7 +513,7 @@ public class MvPartitionStoragesTest {
         assertThat(abortRebalanceMvStorage(0), willCompleteSuccessfully());
 
         assertThat(
-                mvPartitionStorages.startRebalace(0, mvStorage -> 
failedFuture(new RuntimeException("from test"))),
+                mvPartitionStorages.startRebalance(0, mvStorage -> 
failedFuture(new RuntimeException("from test"))),
                 willFailFast(RuntimeException.class)
         );
 
@@ -520,49 +521,132 @@ public class MvPartitionStoragesTest {
     }
 
     @Test
-    void testGetAllForClose() {
-        MvPartitionStorage storage0 = mock(MvPartitionStorage.class);
-        MvPartitionStorage storage1 = mock(MvPartitionStorage.class);
+    void testGetAllForCloseOrDestroy() {
+        CompletableFuture<MvPartitionStorage> mvStorage0 = createMvStorage(0);
+        CompletableFuture<MvPartitionStorage> mvStorage1 = createMvStorage(1);
+        CompletableFuture<MvPartitionStorage> mvStorage2 = createMvStorage(2);
+        CompletableFuture<MvPartitionStorage> mvStorage3 = createMvStorage(3);
+        CompletableFuture<MvPartitionStorage> mvStorage4 = createMvStorage(4);
+        CompletableFuture<MvPartitionStorage> mvStorage5 = createMvStorage(5);
 
-        assertThat(mvPartitionStorages.create(0, partId -> storage0), 
willCompleteSuccessfully());
-        assertThat(mvPartitionStorages.create(1, partId -> storage1), 
willCompleteSuccessfully());
+        assertThat(mvStorage0, willCompleteSuccessfully());
+        assertThat(mvStorage1, willCompleteSuccessfully());
+        assertThat(mvStorage2, willCompleteSuccessfully());
+        assertThat(mvStorage3, willCompleteSuccessfully());
+        assertThat(mvStorage4, willCompleteSuccessfully());
+        assertThat(mvStorage5, willCompleteSuccessfully());
 
-        assertThat(mvPartitionStorages.getAllForClose(), contains(storage0, 
storage1));
+        assertThat(destroyMvStorage(1), willCompleteSuccessfully());
+        assertThat(clearMvStorage(2), willCompleteSuccessfully());
+        assertThat(startRebalanceMvStorage(3), willCompleteSuccessfully());
+
+        assertThat(startRebalanceMvStorage(4), willCompleteSuccessfully());
+        assertThat(abortRebalanceMvStorage(4), willCompleteSuccessfully());
+
+        assertThat(startRebalanceMvStorage(4), willCompleteSuccessfully());
+        assertThat(finishRebalanceMvStorage(4), willCompleteSuccessfully());
+
+        CompletableFuture<List<MvPartitionStorage>> allForCloseOrDestroy = 
mvPartitionStorages.getAllForCloseOrDestroy();
+
+        assertThat(allForCloseOrDestroy, willCompleteSuccessfully());
+
+        // One less, since we destroyed 1 storage.
+        assertThat(
+                allForCloseOrDestroy.join(),
+                containsInAnyOrder(mvStorage0.join(), mvStorage2.join(), 
mvStorage3.join(), mvStorage4.join(), mvStorage5.join())
+        );
+
+        // What happens if we try to perform operations on storages?
+        assertThrowsWithMessage(StorageException.class, () -> 
createMvStorage(6), "Storage is in the process of closing");
+        assertThrowsWithMessage(StorageException.class, () -> 
destroyMvStorage(0), "Storage does not exist");
+        assertThrows(StorageException.class, () -> clearMvStorage(0), "Storage 
does not exist");
+        assertThrows(StorageException.class, () -> startRebalanceMvStorage(0), 
"Storage does not exist");
+        assertThrows(StorageException.class, () -> abortRebalanceMvStorage(0), 
"Storage does not exist");
+        assertThrows(StorageException.class, () -> 
finishRebalanceMvStorage(0), "Storage does not exist");
     }
 
     @Test
-    void testDestroyAll() {
-        MvPartitionStorage storage0 = mock(MvPartitionStorage.class);
-        MvPartitionStorage storage1 = mock(MvPartitionStorage.class);
-
-        assertThat(mvPartitionStorages.create(0, partId -> storage0), 
willCompleteSuccessfully());
-        assertThat(mvPartitionStorages.create(1, partId -> storage1), 
willCompleteSuccessfully());
+    void testWaitOperationOnGetAllForCloseOrDestroy() {
+        CompletableFuture<Void> createStorageOperationFuture = new 
CompletableFuture<>();
+        CompletableFuture<Void> destroyStorageOperationFuture = new 
CompletableFuture<>();
+        CompletableFuture<Void> clearStorageOperationFuture = new 
CompletableFuture<>();
+        CompletableFuture<Void> startRebalanceStorageOperationFuture = new 
CompletableFuture<>();
+        CompletableFuture<Void> abortRebalanceStorageOperationFuture = new 
CompletableFuture<>();
+        CompletableFuture<Void> finishRebalanceStorageOperationFuture = new 
CompletableFuture<>();
+
+        CompletableFuture<?> create0StorageFuture = runAsync(() -> 
mvPartitionStorages.create(0, partId -> {
+            assertThat(createStorageOperationFuture, 
willCompleteSuccessfully());
+
+            return mock(MvPartitionStorage.class);
+        }));
+
+        assertThat(createMvStorage(1), willCompleteSuccessfully());
+        assertThat(createMvStorage(2), willCompleteSuccessfully());
+        assertThat(createMvStorage(3), willCompleteSuccessfully());
+        assertThat(createMvStorage(4), willCompleteSuccessfully());
+        assertThat(createMvStorage(5), willCompleteSuccessfully());
+
+        CompletableFuture<Void> destroy1StorageFuture = 
mvPartitionStorages.destroy(1, storage -> destroyStorageOperationFuture);
+        CompletableFuture<Void> clear2StorageFuture = 
mvPartitionStorages.clear(2, storage -> clearStorageOperationFuture);
+
+        CompletableFuture<Void> startRebalance3StorageFuture = 
mvPartitionStorages.startRebalance(
+                3,
+                storage -> startRebalanceStorageOperationFuture
+        );
 
-        CompletableFuture<Void> startDestroyMvStorage0Future = new 
CompletableFuture<>();
-        CompletableFuture<Void> finishDestroyMvStorage0Future = new 
CompletableFuture<>();
+        assertThat(startRebalanceMvStorage(4), willCompleteSuccessfully());
+        assertThat(startRebalanceMvStorage(5), willCompleteSuccessfully());
 
-        CompletableFuture<?> destroyMvStorage0Future = runAsync(() ->
-                assertThat(mvPartitionStorages.destroy(0, mvStorage -> {
-                    startDestroyMvStorage0Future.complete(null);
+        CompletableFuture<Void> abortRebalance4StorageFuture = 
mvPartitionStorages.abortRebalance(
+                4,
+                storage -> abortRebalanceStorageOperationFuture
+        );
 
-                    return finishDestroyMvStorage0Future;
-                }), willCompleteSuccessfully())
+        CompletableFuture<Void> finishRebalance5StorageFuture = 
mvPartitionStorages.finishRebalance(
+                5,
+                storage -> finishRebalanceStorageOperationFuture
         );
 
-        assertThat(startDestroyMvStorage0Future, willCompleteSuccessfully());
+        CompletableFuture<List<MvPartitionStorage>> allForCloseOrDestroyFuture 
= mvPartitionStorages.getAllForCloseOrDestroy();
 
-        CompletableFuture<Void> destroyAllMvStoragesFuture = 
mvPartitionStorages.destroyAll(mvStorage -> {
-            assertSame(mvStorage, storage1);
+        assertThat(allForCloseOrDestroyFuture, willTimeoutFast());
 
-            return completedFuture(null);
-        });
+        // Let's finish creating the storage.
+        createStorageOperationFuture.complete(null);
+
+        assertThat(create0StorageFuture, willCompleteSuccessfully());
+        assertThat(allForCloseOrDestroyFuture, willTimeoutFast());
+
+        // Let's finish destroying the storage.
+        destroyStorageOperationFuture.complete(null);
+
+        assertThat(destroy1StorageFuture, willCompleteSuccessfully());
+        assertThat(allForCloseOrDestroyFuture, willTimeoutFast());
+
+        // Let's finish clearing the storage.
+        clearStorageOperationFuture.complete(null);
+
+        assertThat(clear2StorageFuture, willCompleteSuccessfully());
+        assertThat(allForCloseOrDestroyFuture, willTimeoutFast());
+
+        // Let's finish starting rebalance the storage.
+        startRebalanceStorageOperationFuture.complete(null);
+
+        assertThat(startRebalance3StorageFuture, willCompleteSuccessfully());
+        assertThat(allForCloseOrDestroyFuture, willTimeoutFast());
+
+        // Let's finish aborting rebalance the storage.
+        abortRebalanceStorageOperationFuture.complete(null);
+
+        assertThat(abortRebalance4StorageFuture, willCompleteSuccessfully());
+        assertThat(allForCloseOrDestroyFuture, willTimeoutFast());
 
-        assertThat(destroyAllMvStoragesFuture, willTimeoutFast());
+        // Let's finish finishing rebalance the storage.
+        finishRebalanceStorageOperationFuture.complete(null);
 
-        finishDestroyMvStorage0Future.complete(null);
+        assertThat(finishRebalance5StorageFuture, willCompleteSuccessfully());
 
-        assertThat(destroyMvStorage0Future, willCompleteSuccessfully());
-        assertThat(destroyAllMvStoragesFuture, willCompleteSuccessfully());
+        assertThat(allForCloseOrDestroyFuture, willCompleteSuccessfully());
     }
 
     private MvPartitionStorage getMvStorage(int partitionId) {
@@ -582,7 +666,7 @@ public class MvPartitionStoragesTest {
     }
 
     private CompletableFuture<Void> startRebalanceMvStorage(int partitionId) {
-        return mvPartitionStorages.startRebalace(partitionId, mvStorage -> 
completedFuture(null));
+        return mvPartitionStorages.startRebalance(partitionId, mvStorage -> 
completedFuture(null));
     }
 
     private CompletableFuture<Void> abortRebalanceMvStorage(int partitionId) {
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index b940f280f1..21e04bcf5a 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -711,6 +711,24 @@ public abstract class AbstractMvTableStorageTest extends 
BaseMvStoragesTest {
         }
     }
 
+    @Test
+    void testCloseStartedRebalance() {
+        MvPartitionStorage mvPartitionStorage = 
getOrCreateMvPartition(PARTITION_ID);
+
+        assertThat(tableStorage.startRebalancePartition(PARTITION_ID), 
willCompleteSuccessfully());
+
+        assertDoesNotThrow(mvPartitionStorage::close);
+    }
+
+    @Test
+    void testDestroyStartedRebalance() {
+        getOrCreateMvPartition(PARTITION_ID);
+
+        assertThat(tableStorage.startRebalancePartition(PARTITION_ID), 
willCompleteSuccessfully());
+
+        assertThat(tableStorage.destroyPartition(PARTITION_ID), 
willCompleteSuccessfully());
+    }
+
     private static void createTestIndexes(TablesConfiguration tablesConfig) {
         List<IndexDefinition> indexDefinitions = List.of(
                 SchemaBuilders.sortedIndex(SORTED_INDEX_NAME)
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
index a5c435df51..f88c0b4ecb 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
@@ -551,8 +551,6 @@ public class TestMvPartitionStorage implements 
MvPartitionStorage {
 
     @Override
     public void close() {
-        assert !rebalance;
-
         closed = true;
 
         clear0();
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
index a049b4da13..251af1fde2 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvTableStorage.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.storage.impl;
 
+import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.createMissingMvPartitionErrorMessage;
 import static org.mockito.Mockito.spy;
@@ -212,12 +213,13 @@ public class TestMvTableStorage implements MvTableStorage 
{
     public CompletableFuture<Void> destroy() {
         stop();
 
-        return mvPartitionStorages.destroyAll(this::destroyPartition);
+        return mvPartitionStorages.getAllForCloseOrDestroy()
+                .thenCompose(mvStorages -> 
allOf(mvStorages.stream().map(this::destroyPartition).toArray(CompletableFuture[]::new)));
     }
 
     @Override
     public CompletableFuture<Void> startRebalancePartition(int partitionId) {
-        return mvPartitionStorages.startRebalace(partitionId, 
mvPartitionStorage -> {
+        return mvPartitionStorages.startRebalance(partitionId, 
mvPartitionStorage -> {
             mvPartitionStorage.startRebalance();
 
             
testHashIndexStorageStream(partitionId).forEach(TestHashIndexStorage::startRebalance);
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
index 7141005556..697cfd6b48 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/AbstractPageMemoryTableStorage.java
@@ -17,13 +17,16 @@
 
 package org.apache.ignite.internal.storage.pagememory;
 
+import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static 
org.apache.ignite.internal.storage.MvPartitionStorage.REBALANCE_IN_PROGRESS;
 import static 
org.apache.ignite.internal.storage.util.StorageUtils.createMissingMvPartitionErrorMessage;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
 import org.apache.ignite.internal.pagememory.DataRegion;
@@ -103,7 +106,11 @@ public abstract class AbstractPageMemoryTableStorage 
implements MvTableStorage {
         busyLock.block();
 
         try {
-            
IgniteUtils.closeAll(mvPartitionStorages.getAllForClose().stream().map(mvPartitionStorage
 -> mvPartitionStorage::close));
+            CompletableFuture<List<AbstractPageMemoryMvPartitionStorage>> 
allForCloseOrDestroy
+                    = mvPartitionStorages.getAllForCloseOrDestroy();
+
+            // 10 seconds is taken by analogy with shutdown of thread pool, in 
general this should be fairly fast.
+            IgniteUtils.closeAllManually(allForCloseOrDestroy.get(10, 
TimeUnit.SECONDS).stream());
         } catch (Exception e) {
             throw new StorageException("Failed to stop PageMemory table 
storage: " + getTableName(), e);
         }
@@ -117,7 +124,8 @@ public abstract class AbstractPageMemoryTableStorage 
implements MvTableStorage {
 
         busyLock.block();
 
-        return mvPartitionStorages.destroyAll(this::destroyMvPartitionStorage)
+        return mvPartitionStorages.getAllForCloseOrDestroy()
+                .thenCompose(storages -> 
allOf(storages.stream().map(this::destroyMvPartitionStorage).toArray(CompletableFuture[]::new)))
                 .whenComplete((unused, throwable) -> {
                     if (throwable == null) {
                         finishDestruction();
@@ -209,7 +217,7 @@ public abstract class AbstractPageMemoryTableStorage 
implements MvTableStorage {
 
     @Override
     public CompletableFuture<Void> startRebalancePartition(int partitionId) {
-        return inBusyLock(busyLock, () -> 
mvPartitionStorages.startRebalace(partitionId, mvPartitionStorage -> {
+        return inBusyLock(busyLock, () -> 
mvPartitionStorages.startRebalance(partitionId, mvPartitionStorage -> {
             mvPartitionStorage.startRebalance();
 
             return clearStorageAndUpdateDataStructures(mvPartitionStorage)
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 5516a5f127..56c529375f 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -751,11 +751,9 @@ public abstract class AbstractPageMemoryMvPartitionStorage 
implements MvPartitio
      * @param goingToDestroy If the closure is in preparation for destruction.
      */
     private void close(boolean goingToDestroy) {
-        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLOSED)) {
-            StorageState state = this.state.get();
-
-            assert state == StorageState.CLOSED : 
IgniteStringFormatter.format("{}, state={}", createStorageInfo(), state);
+        StorageState previous = state.getAndSet(StorageState.CLOSED);
 
+        if (previous == StorageState.CLOSED) {
             return;
         }
 
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 31278c1a90..452579db0d 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -1017,11 +1017,9 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
 
     @Override
     public void close() {
-        if (!state.compareAndSet(StorageState.RUNNABLE, StorageState.CLOSED)) {
-            StorageState state = this.state.get();
-
-            assert state == StorageState.CLOSED : state;
+        StorageState previous = state.getAndSet(StorageState.CLOSED);
 
+        if (previous == StorageState.CLOSED) {
             return;
         }
 
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index f7b75eb41f..745bd6dbfb 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -41,6 +41,7 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
@@ -358,19 +359,23 @@ public class RocksDbTableStorage implements 
MvTableStorage {
 
         resources.add(writeOptions);
 
-        mvPartitionStorages.getAllForClose().forEach(mvPartitionStorage -> 
resources.add(mvPartitionStorage::close));
-
-        for (HashIndex index : hashIndices.values()) {
-            resources.add(index::close);
-        }
+        try {
+            mvPartitionStorages
+                    .getAllForCloseOrDestroy()
+                    // 10 seconds is taken by analogy with shutdown of thread 
pool, in general this should be fairly fast.
+                    .get(10, TimeUnit.SECONDS)
+                    .forEach(mvPartitionStorage -> 
resources.add(mvPartitionStorage::close));
+
+            for (HashIndex index : hashIndices.values()) {
+                resources.add(index::close);
+            }
 
-        for (SortedIndex index : sortedIndices.values()) {
-            resources.add(index::close);
-        }
+            for (SortedIndex index : sortedIndices.values()) {
+                resources.add(index::close);
+            }
 
-        Collections.reverse(resources);
+            Collections.reverse(resources);
 
-        try {
             IgniteUtils.closeAll(resources);
         } catch (Exception e) {
             throw new StorageException("Failed to stop RocksDB table storage: 
" + getTableName(), e);
@@ -605,7 +610,7 @@ public class RocksDbTableStorage implements MvTableStorage {
 
     @Override
     public CompletableFuture<Void> startRebalancePartition(int partitionId) {
-        return inBusyLock(busyLock, () -> 
mvPartitionStorages.startRebalace(partitionId, mvPartitionStorage -> {
+        return inBusyLock(busyLock, () -> 
mvPartitionStorages.startRebalance(partitionId, mvPartitionStorage -> {
             try (WriteBatch writeBatch = new WriteBatch()) {
                 mvPartitionStorage.startRebalance(writeBatch);
 


Reply via email to