GG-11860 Implement snapshot status on platform level
-refactoring RESTORE

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a62cc454
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a62cc454
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a62cc454

Branch: refs/heads/ignite-gg-8.0.3.ea6-clients-test
Commit: a62cc454466f921e2edd26bb0a6e0646bdb3a7f1
Parents: ef35f4f
Author: EdShangGG <[email protected]>
Authored: Thu Mar 2 16:33:58 2017 +0300
Committer: EdShangGG <[email protected]>
Committed: Thu Mar 2 16:34:27 2017 +0300

----------------------------------------------------------------------
 .../pagemem/snapshot/SnapshotOperation.java     |   3 +-
 .../GridDhtPartitionsExchangeFuture.java        | 119 +++++++++++++++++--
 2 files changed, 109 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a62cc454/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java
index 3f84b97..f3b5eee 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java
@@ -105,7 +105,8 @@ public class SnapshotOperation implements Serializable {
      * @param op Op.
      */
     public static Collection<File> getOptionalPathsParameter(SnapshotOperation 
op) {
-        assert op.type() == SnapshotOperationType.CHECK || op.extraParameter() 
instanceof Collection;
+        assert (op.type() == SnapshotOperationType.CHECK || op.type() == 
SnapshotOperationType.RESTORE)
+            && (op.extraParameter() == null || op.extraParameter() instanceof 
Collection);
 
         return (Collection<File>)op.extraParameter();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/a62cc454/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 987ba54..4c179e6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -48,6 +48,8 @@ import 
org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.events.DiscoveryCustomEvent;
 import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
 import 
org.apache.ignite.internal.managers.discovery.GridDiscoveryTopologySnapshot;
+import org.apache.ignite.internal.pagemem.snapshot.SnapshotOperation;
+import org.apache.ignite.internal.pagemem.snapshot.SnapshotOperationType;
 import 
org.apache.ignite.internal.pagemem.snapshot.StartSnapshotOperationAckDiscoveryMessage;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
@@ -59,6 +61,7 @@ import 
org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
+import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
@@ -81,6 +84,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteRunnable;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
@@ -510,10 +514,36 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
 
                     exchange = onCacheChangeRequest(crdNode);
                 }
-                else if (msg instanceof 
StartSnapshotOperationAckDiscoveryMessage)
+                else if (msg instanceof 
StartSnapshotOperationAckDiscoveryMessage) {
                     exchange = CU.clientNode(discoEvt.eventNode()) ?
                         onClientNodeEvent(crdNode) :
                         onServerNodeEvent(crdNode);
+
+                    StartSnapshotOperationAckDiscoveryMessage 
snapshotOperationMsg = (StartSnapshotOperationAckDiscoveryMessage)msg;
+
+                    if (!cctx.localNode().isDaemon()) {
+                        SnapshotOperation op = 
snapshotOperationMsg.snapshotOperation();
+
+                        if (op.type() == SnapshotOperationType.RESTORE) {
+                            if (reqs != null)
+                                reqs = new ArrayList<>(reqs);
+                            else
+                                reqs = new ArrayList<>();
+
+                            List<DynamicCacheChangeRequest> destroyRequests = 
getStopCacheRequests(
+                                cctx.cache(), op.cacheNames(), 
cctx.localNodeId());
+
+                            reqs.addAll(destroyRequests);
+
+                            if (!reqs.isEmpty()) { //Emulate destroy cache 
request
+                                if (op.type() == SnapshotOperationType.RESTORE)
+                                    cctx.cache().onCustomEvent(new 
DynamicCacheChangeBatch(reqs), topVer);
+
+                                onCacheChangeRequest(crdNode);
+                            }
+                        }
+                    }
+                }
                 else {
                     assert affChangeMsg != null : this;
 
@@ -578,6 +608,36 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
     }
 
     /**
+     * @param cache Cache.
+     * @param cacheNames Cache names.
+     * @param locNodeId Local node id.
+     */
+    @NotNull public static List<DynamicCacheChangeRequest> 
getStopCacheRequests(GridCacheProcessor cache,
+        Set<String> cacheNames, UUID locNodeId) {
+        List<DynamicCacheChangeRequest> destroyRequests = new ArrayList<>();
+
+        for (String cacheName : cacheNames) {
+            DynamicCacheDescriptor desc = 
cache.cacheDescriptor(CU.cacheId(cacheName));
+
+            if (desc == null)
+                continue;
+
+            DynamicCacheChangeRequest t = new 
DynamicCacheChangeRequest(UUID.randomUUID(), cacheName, locNodeId);
+
+            t.stop(true);
+            t.destroy(true);
+
+            t.deploymentId(desc.deploymentId());
+
+            t.restart(true);
+
+            destroyRequests.add(t);
+        }
+
+        return destroyRequests;
+    }
+
+    /**
      * @throws IgniteCheckedException If failed.
      */
     private void initTopologies() throws IgniteCheckedException {
@@ -806,19 +866,15 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
 
         cctx.database().beforeExchange(this);
 
-        // If a backup request, synchronously wait for backup start.
-        if (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
-            DiscoveryCustomMessage customMsg = 
((DiscoveryCustomEvent)discoEvt).customMessage();
-
-            if (customMsg instanceof 
StartSnapshotOperationAckDiscoveryMessage) {
-                StartSnapshotOperationAckDiscoveryMessage snapshotOperationMsg 
= (StartSnapshotOperationAckDiscoveryMessage)customMsg;
+        StartSnapshotOperationAckDiscoveryMessage snapshotOperationMsg = 
getSnapshotOperationMessage();
 
-                if (!cctx.localNode().isClient() && 
!cctx.localNode().isDaemon()) {
-                    IgniteInternalFuture fut = 
cctx.database().startLocalSnapshotOperation(snapshotOperationMsg);
+        // If it's a snapshot operation request, synchronously wait for backup 
start.
+        if (snapshotOperationMsg != null) {
+            if (!cctx.localNode().isClient() && !cctx.localNode().isDaemon()) {
+                SnapshotOperation op = 
snapshotOperationMsg.snapshotOperation();
 
-                    if (fut != null)
-                        fut.get();
-                }
+                if (op.type() != SnapshotOperationType.RESTORE)
+                    startLocalSnasphotOperation(snapshotOperationMsg);
             }
         }
 
@@ -833,6 +889,17 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
     }
 
     /**
+     * @param snapshotOperationMsg Snapshot operation message.
+     */
+    private void 
startLocalSnasphotOperation(StartSnapshotOperationAckDiscoveryMessage 
snapshotOperationMsg
+    ) throws IgniteCheckedException {
+        IgniteInternalFuture fut = 
cctx.database().startLocalSnapshotOperation(snapshotOperationMsg);
+
+        if (fut != null)
+            fut.get();
+    }
+
+    /**
      * @throws IgniteCheckedException If failed.
      */
     private void waitPartitionRelease() throws IgniteCheckedException {
@@ -1168,6 +1235,20 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
                 cctx.cache().completeStartFuture(req);
         }
 
+        StartSnapshotOperationAckDiscoveryMessage snapshotOperationMsg = 
getSnapshotOperationMessage();
+
+        if (snapshotOperationMsg != null && !cctx.localNode().isClient() && 
!cctx.localNode().isDaemon()) {
+            SnapshotOperation op = snapshotOperationMsg.snapshotOperation();
+
+            if (op.type() == SnapshotOperationType.RESTORE)
+                try {
+                    startLocalSnasphotOperation(snapshotOperationMsg);
+                }
+                catch (IgniteCheckedException e) {
+                    log.error("Error while starting snapshot operation", e);
+                }
+        }
+
         if (exchangeOnChangeGlobalState && err == null)
             cctx.kernalContext().state().onExchangeDone();
 
@@ -1196,6 +1277,20 @@ public class GridDhtPartitionsExchangeFuture extends 
GridFutureAdapter<AffinityT
         return dummy;
     }
 
+    /**
+     *
+     */
+    private StartSnapshotOperationAckDiscoveryMessage 
getSnapshotOperationMessage() {
+        // If it's a snapshot operation request, synchronously wait for backup 
start.
+        if (discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
+            DiscoveryCustomMessage customMsg = 
((DiscoveryCustomEvent)discoEvt).customMessage();
+
+            if (customMsg instanceof StartSnapshotOperationAckDiscoveryMessage)
+                return  (StartSnapshotOperationAckDiscoveryMessage)customMsg;
+        }
+        return null;
+    }
+
     /** {@inheritDoc} */
     @Nullable @Override public Throwable validateCache(
         GridCacheContext cctx,

Reply via email to