IGNITE-7676 Add affinity version to snapshot plugin stub - Fixes #3510. Signed-off-by: Alexey Goncharuk <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b6d21fb7 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b6d21fb7 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b6d21fb7 Branch: refs/heads/ignite-7725 Commit: b6d21fb7d258f4b2b7d705be92a4f9c83e045135 Parents: bcd3881 Author: EdShangGG <[email protected]> Authored: Fri Feb 16 16:29:49 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Fri Feb 16 16:29:49 2018 +0300 ---------------------------------------------------------------------- .../GridDhtPartitionsExchangeFuture.java | 2 +- .../GridCacheDatabaseSharedManager.java | 64 ++++++++++---------- .../snapshot/IgniteCacheSnapshotManager.java | 9 ++- 3 files changed, 41 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/b6d21fb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 73e6417..7a1265e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -1136,7 +1136,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte try { long start = U.currentTimeMillis(); - IgniteInternalFuture fut = cctx.snapshot().tryStartLocalSnapshotOperation(firstDiscoEvt); + IgniteInternalFuture fut = cctx.snapshot().tryStartLocalSnapshotOperation(firstDiscoEvt, exchId.topologyVersion()); if (fut != null) { fut.get(); http://git-wip-us.apache.org/repos/asf/ignite/blob/b6d21fb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index e53bd44..fde5fb4 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -2092,54 +2092,56 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan * @param partStates Partition to restore state. */ public void applyUpdatesOnRecovery( - WALIterator it, + @Nullable WALIterator it, IgnitePredicate<IgniteBiTuple<WALPointer, WALRecord>> recPredicate, IgnitePredicate<DataEntry> entryPredicate, Map<T2<Integer, Integer>, T2<Integer, Long>> partStates ) throws IgniteCheckedException { - while (it.hasNextX()) { - IgniteBiTuple<WALPointer, WALRecord> next = it.nextX(); + if (it != null) { + while (it.hasNextX()) { + IgniteBiTuple<WALPointer, WALRecord> next = it.nextX(); - WALRecord rec = next.get2(); + WALRecord rec = next.get2(); - if (!recPredicate.apply(next)) - break; + if (!recPredicate.apply(next)) + break; - switch (rec.type()) { - case DATA_RECORD: - checkpointReadLock(); + switch (rec.type()) { + case DATA_RECORD: + checkpointReadLock(); - try { - DataRecord dataRec = (DataRecord) rec; + try { + DataRecord dataRec = (DataRecord)rec; - for (DataEntry dataEntry : dataRec.writeEntries()) { - if (entryPredicate.apply(dataEntry)) { - checkpointReadLock(); + for (DataEntry dataEntry : dataRec.writeEntries()) { + if (entryPredicate.apply(dataEntry)) { + checkpointReadLock(); - try { - int cacheId = dataEntry.cacheId(); + try { + int cacheId = dataEntry.cacheId(); - GridCacheContext cacheCtx = cctx.cacheContext(cacheId); + GridCacheContext cacheCtx = cctx.cacheContext(cacheId); - if (cacheCtx != null) - applyUpdate(cacheCtx, dataEntry); - else if (log != null) - log.warning("Cache (cacheId=" + cacheId + ") is not started, can't apply updates."); - } - finally { - checkpointReadUnlock(); + if (cacheCtx != null) + applyUpdate(cacheCtx, dataEntry); + else if (log != null) + log.warning("Cache (cacheId=" + cacheId + ") is not started, can't apply updates."); + } + finally { + checkpointReadUnlock(); + } } } } - } - finally { - checkpointReadUnlock(); - } + finally { + checkpointReadUnlock(); + } - break; + break; - default: - // Skip other records. + default: + // Skip other records. + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/b6d21fb7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteCacheSnapshotManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteCacheSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteCacheSnapshotManager.java index c23adda..16cc8f5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteCacheSnapshotManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteCacheSnapshotManager.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.pagemem.FullPageId; import org.apache.ignite.internal.pagemem.PageMemory; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; @@ -48,9 +49,12 @@ public class IgniteCacheSnapshotManager<T extends SnapshotOperation> extends Gri * Try to start local snapshot operation if it's required by discovery event. * * @param discoveryEvent Discovery event. + * @param topVer topology version on the moment when this method was called + * + * @throws IgniteCheckedException if failed */ @Nullable public IgniteInternalFuture tryStartLocalSnapshotOperation( - @Nullable DiscoveryEvent discoveryEvent + @Nullable DiscoveryEvent discoveryEvent, AffinityTopologyVersion topVer ) throws IgniteCheckedException { return null; } @@ -61,7 +65,8 @@ public class IgniteCacheSnapshotManager<T extends SnapshotOperation> extends Gri */ @Nullable public IgniteInternalFuture startLocalSnapshotOperation( UUID initiatorNodeId, - T snapshotOperation + T snapshotOperation, + AffinityTopologyVersion topVer ) throws IgniteCheckedException { return null; }
