IGNITE-7676 Add affinity version to snapshot plugin stub - Fixes #3510.

Signed-off-by: Alexey Goncharuk <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b6d21fb7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b6d21fb7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b6d21fb7

Branch: refs/heads/ignite-7725
Commit: b6d21fb7d258f4b2b7d705be92a4f9c83e045135
Parents: bcd3881
Author: EdShangGG <[email protected]>
Authored: Fri Feb 16 16:29:49 2018 +0300
Committer: Alexey Goncharuk <[email protected]>
Committed: Fri Feb 16 16:29:49 2018 +0300

----------------------------------------------------------------------
 .../GridDhtPartitionsExchangeFuture.java        |  2 +-
 .../GridCacheDatabaseSharedManager.java         | 64 ++++++++++----------
 .../snapshot/IgniteCacheSnapshotManager.java    |  9 ++-
 3 files changed, 41 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b6d21fb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 73e6417..7a1265e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -1136,7 +1136,7 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
         try {
             long start = U.currentTimeMillis();
 
-            IgniteInternalFuture fut = 
cctx.snapshot().tryStartLocalSnapshotOperation(firstDiscoEvt);
+            IgniteInternalFuture fut = 
cctx.snapshot().tryStartLocalSnapshotOperation(firstDiscoEvt, 
exchId.topologyVersion());
 
             if (fut != null) {
                 fut.get();

http://git-wip-us.apache.org/repos/asf/ignite/blob/b6d21fb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index e53bd44..fde5fb4 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -2092,54 +2092,56 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
      * @param partStates Partition to restore state.
      */
     public void applyUpdatesOnRecovery(
-        WALIterator it,
+        @Nullable WALIterator it,
         IgnitePredicate<IgniteBiTuple<WALPointer, WALRecord>> recPredicate,
         IgnitePredicate<DataEntry> entryPredicate,
         Map<T2<Integer, Integer>, T2<Integer, Long>> partStates
     ) throws IgniteCheckedException {
-        while (it.hasNextX()) {
-            IgniteBiTuple<WALPointer, WALRecord> next = it.nextX();
+        if (it != null) {
+            while (it.hasNextX()) {
+                IgniteBiTuple<WALPointer, WALRecord> next = it.nextX();
 
-            WALRecord rec = next.get2();
+                WALRecord rec = next.get2();
 
-            if (!recPredicate.apply(next))
-                break;
+                if (!recPredicate.apply(next))
+                    break;
 
-            switch (rec.type()) {
-                case DATA_RECORD:
-                    checkpointReadLock();
+                switch (rec.type()) {
+                    case DATA_RECORD:
+                        checkpointReadLock();
 
-                    try {
-                        DataRecord dataRec = (DataRecord) rec;
+                        try {
+                            DataRecord dataRec = (DataRecord)rec;
 
-                        for (DataEntry dataEntry : dataRec.writeEntries()) {
-                            if (entryPredicate.apply(dataEntry)) {
-                                checkpointReadLock();
+                            for (DataEntry dataEntry : dataRec.writeEntries()) 
{
+                                if (entryPredicate.apply(dataEntry)) {
+                                    checkpointReadLock();
 
-                                try {
-                                    int cacheId = dataEntry.cacheId();
+                                    try {
+                                        int cacheId = dataEntry.cacheId();
 
-                                    GridCacheContext cacheCtx = 
cctx.cacheContext(cacheId);
+                                        GridCacheContext cacheCtx = 
cctx.cacheContext(cacheId);
 
-                                    if (cacheCtx != null)
-                                        applyUpdate(cacheCtx, dataEntry);
-                                    else if (log != null)
-                                        log.warning("Cache (cacheId=" + 
cacheId + ") is not started, can't apply updates.");
-                                }
-                                finally {
-                                    checkpointReadUnlock();
+                                        if (cacheCtx != null)
+                                            applyUpdate(cacheCtx, dataEntry);
+                                        else if (log != null)
+                                            log.warning("Cache (cacheId=" + 
cacheId + ") is not started, can't apply updates.");
+                                    }
+                                    finally {
+                                        checkpointReadUnlock();
+                                    }
                                 }
                             }
                         }
-                    }
-                    finally {
-                        checkpointReadUnlock();
-                    }
+                        finally {
+                            checkpointReadUnlock();
+                        }
 
-                    break;
+                        break;
 
-                default:
-                    // Skip other records.
+                    default:
+                        // Skip other records.
+                }
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b6d21fb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteCacheSnapshotManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteCacheSnapshotManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteCacheSnapshotManager.java
index c23adda..16cc8f5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteCacheSnapshotManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteCacheSnapshotManager.java
@@ -26,6 +26,7 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.pagemem.FullPageId;
 import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import 
org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
@@ -48,9 +49,12 @@ public class IgniteCacheSnapshotManager<T extends 
SnapshotOperation> extends Gri
      * Try to start local snapshot operation if it's required by discovery 
event.
      *
      * @param discoveryEvent Discovery event.
+     * @param topVer topology version on the moment when this method was called
+     *
+     * @throws IgniteCheckedException if failed
      */
     @Nullable public IgniteInternalFuture tryStartLocalSnapshotOperation(
-            @Nullable DiscoveryEvent discoveryEvent
+            @Nullable DiscoveryEvent discoveryEvent, AffinityTopologyVersion 
topVer
     ) throws IgniteCheckedException {
         return null;
     }
@@ -61,7 +65,8 @@ public class IgniteCacheSnapshotManager<T extends 
SnapshotOperation> extends Gri
      */
     @Nullable public IgniteInternalFuture startLocalSnapshotOperation(
         UUID initiatorNodeId,
-        T snapshotOperation
+        T snapshotOperation,
+        AffinityTopologyVersion topVer
     ) throws IgniteCheckedException {
         return null;
     }

Reply via email to