IGNITE-7871 Implemented additional synchronization phase for correct partition counters update
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/da77b981 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/da77b981 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/da77b981 Branch: refs/heads/ignite-8201 Commit: da77b9818a70495b7afdf6899ebd9180dadd7f68 Parents: f4de6df Author: Pavel Kovalenko <jokse...@gmail.com> Authored: Wed Apr 11 11:23:46 2018 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Wed Apr 11 11:23:46 2018 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/internal/GridTopic.java | 5 +- .../communication/GridIoMessageFactory.java | 6 + .../discovery/GridDiscoveryManager.java | 10 + .../MetaPageUpdatePartitionDataRecord.java | 2 +- .../processors/cache/CacheMetricsImpl.java | 2 +- .../processors/cache/GridCacheMvccManager.java | 38 + .../GridCachePartitionExchangeManager.java | 17 + .../cache/GridCacheSharedContext.java | 9 +- .../processors/cache/GridCacheUtils.java | 2 +- .../cache/IgniteCacheOffheapManager.java | 8 +- .../cache/IgniteCacheOffheapManagerImpl.java | 10 +- .../dht/GridClientPartitionTopology.java | 5 + .../distributed/dht/GridDhtLocalPartition.java | 9 +- .../dht/GridDhtPartitionTopology.java | 6 + .../dht/GridDhtPartitionTopologyImpl.java | 26 +- .../dht/GridDhtPartitionsStateValidator.java | 255 +++++++ .../cache/distributed/dht/GridDhtTxLocal.java | 5 + .../GridDhtPartitionsExchangeFuture.java | 96 ++- .../GridDhtPartitionsSingleMessage.java | 68 +- .../dht/preloader/InitNewCoordinatorFuture.java | 2 +- .../preloader/latch/ExchangeLatchManager.java | 695 +++++++++++++++++++ .../distributed/dht/preloader/latch/Latch.java | 52 ++ .../dht/preloader/latch/LatchAckMessage.java | 165 +++++ .../cache/distributed/near/GridNearTxLocal.java | 10 + .../persistence/GridCacheOffheapManager.java | 10 +- .../cache/transactions/IgniteTxAdapter.java | 2 +- .../cache/transactions/IgniteTxManager.java | 36 +- ...cheDhtLocalPartitionAfterRemoveSelfTest.java | 2 +- .../processors/cache/IgniteCacheGroupsTest.java | 1 + ...ExchangeLatchManagerCoordinatorFailTest.java | 244 +++++++ .../GridCachePartitionsStateValidationTest.java | 316 +++++++++ ...idCachePartitionsStateValidatorSelfTest.java | 158 +++++ .../TxOptimisticOnPartitionExchangeTest.java | 322 +++++++++ .../ignite/testsuites/IgniteCacheTestSuite.java | 4 + .../testsuites/IgniteCacheTestSuite6.java | 6 + 35 files changed, 2568 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java index 1227e8c..0b2d41a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java @@ -124,7 +124,10 @@ public enum GridTopic { TOPIC_METRICS, /** */ - TOPIC_AUTH; + TOPIC_AUTH, + + /** */ + TOPIC_EXCHANGE; /** Enum values. */ private static final GridTopic[] VALS = values(); http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 5616fd0..581c32e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -53,6 +53,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl; import org.apache.ignite.internal.processors.cache.WalStateAckMessage; import org.apache.ignite.internal.processors.cache.binary.MetadataRequestMessage; import org.apache.ignite.internal.processors.cache.binary.MetadataResponseMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.LatchAckMessage; import org.apache.ignite.internal.processors.cache.distributed.GridCacheTtlUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryRequest; import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryResponse; @@ -921,6 +922,11 @@ public class GridIoMessageFactory implements MessageFactory { break; + case 135: + msg = new LatchAckMessage(); + + break; + // [-3..119] [124..129] [-23..-27] [-36..-55]- this // [120..123] - DR // [-4..-22, -30..-35] - SQL http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index a1d84e5..400bb5f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -793,6 +793,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { ((IgniteKernal)ctx.grid()).onDisconnected(); + if (!locJoin.isDone()) + locJoin.onDone(new IgniteCheckedException("Node disconnected")); + locJoin = new GridFutureAdapter<>(); registeredCaches.clear(); @@ -2142,6 +2145,13 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { } /** + * @return Local join future. + */ + public GridFutureAdapter<DiscoveryLocalJoinData> localJoinFuture() { + return locJoin; + } + + /** * @param msg Custom message. * @throws IgniteCheckedException If failed. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java index bafbf47..e5bd343 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdatePartitionDataRecord.java @@ -32,7 +32,7 @@ public class MetaPageUpdatePartitionDataRecord extends PageDeltaRecord { /** */ private long globalRmvId; - /** */ + /** TODO: Partition size may be long */ private int partSize; /** */ http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java index 6fae8fe..b402ff2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java @@ -792,7 +792,7 @@ public class CacheMetricsImpl implements CacheMetrics { if (cctx.cache() == null) continue; - int cacheSize = part.dataStore().cacheSize(cctx.cacheId()); + long cacheSize = part.dataStore().cacheSize(cctx.cacheId()); offHeapEntriesCnt += cacheSize; http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index a9fa3c7..fade833 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -44,6 +44,8 @@ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.GridCacheMappedVersion; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishFuture; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishFuture; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -314,6 +316,42 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { } /** + * Creates a future that will wait for finishing all remote transactions (primary -> backup) + * with topology version less or equal to {@code topVer}. + * + * @param topVer Topology version. + * @return Compound future of all {@link GridDhtTxFinishFuture} futures. + */ + public IgniteInternalFuture<?> finishRemoteTxs(AffinityTopologyVersion topVer) { + GridCompoundFuture<?, ?> res = new CacheObjectsReleaseFuture<>("RemoteTx", topVer); + + for (GridCacheFuture<?> fut : futs.values()) { + if (fut instanceof GridDhtTxFinishFuture) { + GridDhtTxFinishFuture finishTxFuture = (GridDhtTxFinishFuture) fut; + + if (cctx.tm().needWaitTransaction(finishTxFuture.tx(), topVer)) + res.add(ignoreErrors(finishTxFuture)); + } + } + + res.markInitialized(); + + return res; + } + + /** + * Future wrapper which ignores any underlying future errors. + * + * @param f Underlying future. + * @return Future wrapper which ignore any underlying future errors. + */ + private IgniteInternalFuture ignoreErrors(IgniteInternalFuture<?> f) { + GridFutureAdapter<?> wrapper = new GridFutureAdapter(); + f.listen(future -> wrapper.onDone()); + return wrapper; + } + + /** * @param leftNodeId Left node ID. * @param topVer Topology version. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 1a0e65f..20a3ccb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -64,6 +64,7 @@ import org.apache.ignite.internal.managers.discovery.DiscoveryLocalJoinData; import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.ExchangeLatchManager; import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; @@ -216,6 +217,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** For tests only. */ private volatile AffinityTopologyVersion exchMergeTestWaitVer; + /** Distributed latch manager. */ + private ExchangeLatchManager latchMgr; + /** Discovery listener. */ private final DiscoveryEventListener discoLsnr = new DiscoveryEventListener() { @Override public void onEvent(DiscoveryEvent evt, DiscoCache cache) { @@ -309,6 +313,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana exchWorker = new ExchangeWorker(); + latchMgr = new ExchangeLatchManager(cctx.kernalContext()); + cctx.gridEvents().addDiscoveryEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_DISCOVERY_CUSTOM_EVT); @@ -1255,6 +1261,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana m.addPartitionUpdateCounters(grp.groupId(), newCntrMap ? cntrsMap : CachePartitionPartialCountersMap.toCountersMap(cntrsMap)); + + m.addPartitionSizes(grp.groupId(), grp.topology().partitionSizes()); } } } @@ -1277,6 +1285,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana m.addPartitionUpdateCounters(top.groupId(), newCntrMap ? cntrsMap : CachePartitionPartialCountersMap.toCountersMap(cntrsMap)); + + m.addPartitionSizes(top.groupId(), top.partitionSizes()); } } @@ -1570,6 +1580,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** + * @return Latch manager instance. + */ + public ExchangeLatchManager latch() { + return latchMgr; + } + + /** * @param exchFut Optional current exchange future. * @throws Exception If failed. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index c2f9229..b3b4f0d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -26,7 +26,6 @@ import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerArray; -import java.util.function.BiFunction; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; @@ -711,7 +710,7 @@ public class GridCacheSharedContext<K, V> { /** * @return Ttl cleanup manager. - * */ + */ public GridCacheSharedTtlCleanupManager ttl() { return ttlMgr; } @@ -854,10 +853,14 @@ public class GridCacheSharedContext<K, V> { GridCompoundFuture f = new CacheObjectsReleaseFuture("Partition", topVer); f.add(mvcc().finishExplicitLocks(topVer)); - f.add(tm().finishTxs(topVer)); f.add(mvcc().finishAtomicUpdates(topVer)); f.add(mvcc().finishDataStreamerUpdates(topVer)); + IgniteInternalFuture<?> finishLocalTxsFuture = tm().finishLocalTxs(topVer); + // To properly track progress of finishing local tx updates we explicitly add this future to compound set. + f.add(finishLocalTxsFuture); + f.add(tm().finishAllTxs(finishLocalTxsFuture, topVer)); + f.markInitialized(); return f; http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java index a5169d2..d672420 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java @@ -1732,7 +1732,7 @@ public class GridCacheUtils { ver, expiryPlc == null ? 0 : expiryPlc.forCreate(), expiryPlc == null ? 0 : toExpireTime(expiryPlc.forCreate()), - false, + true, topVer, GridDrType.DR_BACKUP, true); http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java index 3d83f87..a12c033 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java @@ -22,11 +22,11 @@ import javax.cache.Cache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.persistence.RootPage; import org.apache.ignite.internal.processors.cache.persistence.RowStore; import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner; import org.apache.ignite.internal.util.GridAtomicLong; @@ -344,7 +344,7 @@ public interface IgniteCacheOffheapManager { * @param part Partition. * @return Number of entries. */ - public int totalPartitionEntriesCount(int part); + public long totalPartitionEntriesCount(int part); /** * @@ -381,7 +381,7 @@ public interface IgniteCacheOffheapManager { * @param cacheId Cache ID. * @return Size. */ - int cacheSize(int cacheId); + long cacheSize(int cacheId); /** * @return Cache sizes if store belongs to group containing multiple caches. @@ -391,7 +391,7 @@ public interface IgniteCacheOffheapManager { /** * @return Total size. */ - int fullSize(); + long fullSize(); /** * @return Update counter. http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index b201935..f8cc86f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -252,7 +252,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } /** {@inheritDoc} */ - @Override public int totalPartitionEntriesCount(int p) { + @Override public long totalPartitionEntriesCount(int p) { if (grp.isLocal()) return locCacheDataStore.fullSize(); else { @@ -1152,14 +1152,14 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } /** {@inheritDoc} */ - @Override public int cacheSize(int cacheId) { + @Override public long cacheSize(int cacheId) { if (grp.sharedGroup()) { AtomicLong size = cacheSizes.get(cacheId); return size != null ? (int)size.get() : 0; } - return (int)storageSize.get(); + return storageSize.get(); } /** {@inheritDoc} */ @@ -1176,8 +1176,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } /** {@inheritDoc} */ - @Override public int fullSize() { - return (int)storageSize.get(); + @Override public long fullSize() { + return storageSize.get(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 5bbbb31..3e3bb0d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -1196,6 +1196,11 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { } /** {@inheritDoc} */ + @Override public Map<Integer, Long> partitionSizes() { + return Collections.emptyMap(); + } + + /** {@inheritDoc} */ @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) { assert false : "Should not be called on non-affinity node"; http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 7a47f31..ea20dbf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -929,7 +929,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements /** * @return Initial update counter. */ - public Long initialUpdateCounter() { + public long initialUpdateCounter() { return store.initialUpdateCounter(); } @@ -948,6 +948,13 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } /** + * @return Total size of all caches. + */ + public long fullSize() { + return store.fullSize(); + } + + /** * Removes all entries and rows from this partition. * * @return Number of rows cleared from page memory. http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index 7f900cb..6f68dbb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; @@ -345,6 +346,11 @@ public interface GridDhtPartitionTopology { public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros); /** + * @return Partition cache sizes. + */ + public Map<Integer, Long> partitionSizes(); + + /** * @param part Partition to own. * @return {@code True} if owned. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 538c57e..740903e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -31,6 +31,8 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; @@ -2526,6 +2528,28 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } /** {@inheritDoc} */ + @Override public Map<Integer, Long> partitionSizes() { + lock.readLock().lock(); + + try { + Map<Integer, Long> partitionSizes = new HashMap<>(); + + for (int p = 0; p < locParts.length(); p++) { + GridDhtLocalPartition part = locParts.get(p); + if (part == null || part.fullSize() == 0) + continue; + + partitionSizes.put(part.id(), part.fullSize()); + } + + return partitionSizes; + } + finally { + lock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) { AffinityTopologyVersion curTopVer = this.readyTopVer; @@ -2587,7 +2611,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (part == null) continue; - int size = part.dataStore().fullSize(); + long size = part.dataStore().fullSize(); if (size >= threshold) X.println(">>> Local partition [part=" + part.id() + ", size=" + size + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java new file mode 100644 index 0000000..92a0584 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Set; +import java.util.TreeMap; +import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.internal.SB; +import org.apache.ignite.lang.IgniteProductVersion; + +import static org.apache.ignite.events.EventType.EVT_NODE_JOINED; + +/** + * Class to validate partitions update counters and cache sizes during exchange process. + */ +public class GridDhtPartitionsStateValidator { + /** Version since node is able to send cache sizes in {@link GridDhtPartitionsSingleMessage}. */ + private static final IgniteProductVersion SIZES_VALIDATION_AVAILABLE_SINCE = IgniteProductVersion.fromString("2.5.0"); + + /** Cache shared context. */ + private final GridCacheSharedContext<?, ?> cctx; + + /** + * Constructor. + * + * @param cctx Cache shared context. + */ + public GridDhtPartitionsStateValidator(GridCacheSharedContext<?, ?> cctx) { + this.cctx = cctx; + } + + /** + * Validates partition states - update counters and cache sizes for all nodes. + * If update counter value or cache size for the same partitions are different on some nodes + * method throws exception with full information about inconsistent partitions. + * + * @param fut Current exchange future. + * @param top Topology to validate. + * @param messages Single messages received from all nodes. + * @throws IgniteCheckedException If validation failed. Exception message contains + * full information about all partitions which update counters or cache sizes are not consistent. + */ + public void validatePartitionCountersAndSizes(GridDhtPartitionsExchangeFuture fut, + GridDhtPartitionTopology top, + Map<UUID, GridDhtPartitionsSingleMessage> messages) throws IgniteCheckedException { + // Ignore just joined nodes. + final Set<UUID> ignoringNodes = new HashSet<>(); + + for (DiscoveryEvent evt : fut.events().events()) + if (evt.type() == EVT_NODE_JOINED) + ignoringNodes.add(evt.eventNode().id()); + + AffinityTopologyVersion topVer = fut.context().events().topologyVersion(); + + // Validate update counters. + Map<Integer, Map<UUID, Long>> result = validatePartitionsUpdateCounters(top, messages, ignoringNodes); + if (!result.isEmpty()) + throw new IgniteCheckedException("Partitions update counters are inconsistent for " + fold(topVer, result)); + + // For sizes validation ignore also nodes which are not able to send cache sizes. + for (UUID id : messages.keySet()) { + ClusterNode node = cctx.discovery().node(id); + if (node != null && node.version().compareTo(SIZES_VALIDATION_AVAILABLE_SINCE) < 0) + ignoringNodes.add(id); + } + + // Validate cache sizes. + result = validatePartitionsSizes(top, messages, ignoringNodes); + if (!result.isEmpty()) + throw new IgniteCheckedException("Partitions cache sizes are inconsistent for " + fold(topVer, result)); + } + + /** + * Validate partitions update counters for given {@code top}. + * + * @param top Topology to validate. + * @param messages Single messages received from all nodes. + * @param ignoringNodes Nodes for what we ignore validation. + * @return Invalid partitions map with following structure: (partId, (nodeId, updateCounter)). + * If map is empty validation is successful. + */ + Map<Integer, Map<UUID, Long>> validatePartitionsUpdateCounters( + GridDhtPartitionTopology top, + Map<UUID, GridDhtPartitionsSingleMessage> messages, + Set<UUID> ignoringNodes) { + Map<Integer, Map<UUID, Long>> invalidPartitions = new HashMap<>(); + + Map<Integer, T2<UUID, Long>> updateCountersAndNodesByPartitions = new HashMap<>(); + + // Populate counters statistics from local node partitions. + for (GridDhtLocalPartition part : top.currentLocalPartitions()) { + if (top.partitionState(cctx.localNodeId(), part.id()) != GridDhtPartitionState.OWNING) + continue; + + updateCountersAndNodesByPartitions.put(part.id(), new T2<>(cctx.localNodeId(), part.updateCounter())); + } + + int partitions = top.partitions(); + + // Then process and validate counters from other nodes. + for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : messages.entrySet()) { + UUID nodeId = e.getKey(); + if (ignoringNodes.contains(nodeId)) + continue; + + CachePartitionPartialCountersMap countersMap = e.getValue().partitionUpdateCounters(top.groupId(), partitions); + + for (int part = 0; part < partitions; part++) { + if (top.partitionState(nodeId, part) != GridDhtPartitionState.OWNING) + continue; + + int partIdx = countersMap.partitionIndex(part); + long currentCounter = partIdx >= 0 ? countersMap.updateCounterAt(partIdx) : 0; + + process(invalidPartitions, updateCountersAndNodesByPartitions, part, nodeId, currentCounter); + } + } + + return invalidPartitions; + } + + /** + * Validate partitions cache sizes for given {@code top}. + * + * @param top Topology to validate. + * @param messages Single messages received from all nodes. + * @param ignoringNodes Nodes for what we ignore validation. + * @return Invalid partitions map with following structure: (partId, (nodeId, cacheSize)). + * If map is empty validation is successful. + */ + Map<Integer, Map<UUID, Long>> validatePartitionsSizes( + GridDhtPartitionTopology top, + Map<UUID, GridDhtPartitionsSingleMessage> messages, + Set<UUID> ignoringNodes) { + Map<Integer, Map<UUID, Long>> invalidPartitions = new HashMap<>(); + + Map<Integer, T2<UUID, Long>> sizesAndNodesByPartitions = new HashMap<>(); + + // Populate sizes statistics from local node partitions. + for (GridDhtLocalPartition part : top.currentLocalPartitions()) { + if (top.partitionState(cctx.localNodeId(), part.id()) != GridDhtPartitionState.OWNING) + continue; + + sizesAndNodesByPartitions.put(part.id(), new T2<>(cctx.localNodeId(), part.fullSize())); + } + + int partitions = top.partitions(); + + // Then process and validate sizes from other nodes. + for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : messages.entrySet()) { + UUID nodeId = e.getKey(); + if (ignoringNodes.contains(nodeId)) + continue; + + Map<Integer, Long> sizesMap = e.getValue().partitionSizes(top.groupId()); + + for (int part = 0; part < partitions; part++) { + if (top.partitionState(nodeId, part) != GridDhtPartitionState.OWNING) + continue; + + long currentSize = sizesMap.containsKey(part) ? sizesMap.get(part) : 0L; + + process(invalidPartitions, sizesAndNodesByPartitions, part, nodeId, currentSize); + } + } + + return invalidPartitions; + } + + /** + * Processes given {@code counter} for partition {@code part} reported by {@code node}. + * Populates {@code invalidPartitions} map if existing counter and current {@code counter} are different. + * + * @param invalidPartitions Invalid partitions map. + * @param countersAndNodes Current map of counters and nodes by partitions. + * @param part Processing partition. + * @param node Node id. + * @param counter Counter value reported by {@code node}. + */ + private void process(Map<Integer, Map<UUID, Long>> invalidPartitions, + Map<Integer, T2<UUID, Long>> countersAndNodes, + int part, + UUID node, + long counter) { + T2<UUID, Long> existingData = countersAndNodes.get(part); + + if (existingData == null) + countersAndNodes.put(part, new T2<>(node, counter)); + + if (existingData != null && counter != existingData.get2()) { + if (!invalidPartitions.containsKey(part)) { + Map<UUID, Long> map = new HashMap<>(); + map.put(existingData.get1(), existingData.get2()); + invalidPartitions.put(part, map); + } + + invalidPartitions.get(part).put(node, counter); + } + } + + /** + * Folds given map of invalid partition states to string representation in the following format: + * Part [id]: [consistentId=value*] + * + * Value can be both update counter or cache size. + * + * @param topVer Last topology version. + * @param invalidPartitions Invalid partitions map. + * @return String representation of invalid partitions. + */ + private String fold(AffinityTopologyVersion topVer, Map<Integer, Map<UUID, Long>> invalidPartitions) { + SB sb = new SB(); + + NavigableMap<Integer, Map<UUID, Long>> sortedPartitions = new TreeMap<>(invalidPartitions); + + for (Map.Entry<Integer, Map<UUID, Long>> p : sortedPartitions.entrySet()) { + sb.a("Part ").a(p.getKey()).a(": ["); + for (Map.Entry<UUID, Long> e : p.getValue().entrySet()) { + Object consistentId = cctx.discovery().node(topVer, e.getKey()).consistentId(); + sb.a(consistentId).a("=").a(e.getValue()).a(" "); + } + sb.a("] "); + } + + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index 28cc018..0609f04 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -447,6 +447,11 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa err = e; } + catch (Throwable t) { + fut.onDone(t); + + throw t; + } if (primarySync) sendFinishReply(err); http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index cbb4985..dd4a571 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -41,6 +41,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheRebalanceMode; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.events.DiscoveryEvent; @@ -75,10 +76,12 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.LocalJoinCachesContext; import org.apache.ignite.internal.processors.cache.StateChangeRequest; import org.apache.ignite.internal.processors.cache.WalStateAbstractMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.Latch; import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsStateValidator; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter; import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; @@ -290,6 +293,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte @GridToStringExclude private GridDhtPartitionsExchangeFuture mergedWith; + /** Validator for partition states. */ + @GridToStringExclude + private final GridDhtPartitionsStateValidator validator; + /** * @param cctx Cache context. * @param busyLock Busy lock. @@ -314,6 +321,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte this.exchId = exchId; this.exchActions = exchActions; this.affChangeMsg = affChangeMsg; + this.validator = new GridDhtPartitionsStateValidator(cctx); log = cctx.logger(getClass()); exchLog = cctx.logger(EXCHANGE_LOG); @@ -1099,7 +1107,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte // To correctly rebalance when persistence is enabled, it is necessary to reserve history within exchange. partHistReserved = cctx.database().reserveHistoryForExchange(); - waitPartitionRelease(); + // On first phase we wait for finishing all local tx updates, atomic updates and lock releases. + waitPartitionRelease(1); + + // Second phase is needed to wait for finishing all tx updates from primary to backup nodes remaining after first phase. + waitPartitionRelease(2); boolean topChanged = firstDiscoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT || affChangeMsg != null; @@ -1202,9 +1214,17 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * For the exact list of the objects being awaited for see * {@link GridCacheSharedContext#partitionReleaseFuture(AffinityTopologyVersion)} javadoc. * + * @param phase Phase of partition release. + * * @throws IgniteCheckedException If failed. */ - private void waitPartitionRelease() throws IgniteCheckedException { + private void waitPartitionRelease(int phase) throws IgniteCheckedException { + Latch releaseLatch = null; + + // Wait for other nodes only on first phase. + if (phase == 1) + releaseLatch = cctx.exchange().latch().getOrCreate("exchange", initialVersion()); + IgniteInternalFuture<?> partReleaseFut = cctx.partitionReleaseFuture(initialVersion()); // Assign to class variable so it will be included into toString() method. @@ -1238,6 +1258,11 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte nextDumpTime = U.currentTimeMillis() + nextDumpTimeout(dumpCnt++, futTimeout); } } + catch (IgniteCheckedException e) { + U.warn(log,"Unable to await partitions release future", e); + + throw e; + } } long waitEnd = U.currentTimeMillis(); @@ -1290,6 +1315,35 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } } + + if (releaseLatch == null) + return; + + releaseLatch.countDown(); + + if (!localJoinExchange()) { + try { + while (true) { + try { + releaseLatch.await(futTimeout, TimeUnit.MILLISECONDS); + + if (log.isInfoEnabled()) + log.info("Finished waiting for partitions release latch: " + releaseLatch); + + break; + } + catch (IgniteFutureTimeoutCheckedException ignored) { + U.warn(log, "Unable to await partitions release latch within timeout: " + releaseLatch); + + // Try to resend ack. + releaseLatch.countDown(); + } + } + } + catch (IgniteCheckedException e) { + U.warn(log, "Stop waiting for partitions release latch: " + e.getMessage()); + } + } } /** @@ -2499,6 +2553,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } + validatePartitionsState(); + if (firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) { assert firstDiscoEvt instanceof DiscoveryCustomEvent; @@ -2683,6 +2739,42 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** + * Validates that partition update counters and cache sizes for all caches are consistent. + */ + private void validatePartitionsState() { + for (Map.Entry<Integer, CacheGroupDescriptor> e : cctx.affinity().cacheGroups().entrySet()) { + CacheGroupDescriptor grpDesc = e.getValue(); + if (grpDesc.config().getCacheMode() == CacheMode.LOCAL) + continue; + + int grpId = e.getKey(); + + CacheGroupContext grpCtx = cctx.cache().cacheGroup(grpId); + + GridDhtPartitionTopology top = grpCtx != null ? + grpCtx.topology() : + cctx.exchange().clientTopology(grpId, events().discoveryCache()); + + // Do not validate read or write through caches or caches with disabled rebalance. + if (grpCtx == null + || grpCtx.config().isReadThrough() + || grpCtx.config().isWriteThrough() + || grpCtx.config().getCacheStoreFactory() != null + || grpCtx.config().getRebalanceDelay() != -1 + || grpCtx.config().getRebalanceMode() == CacheRebalanceMode.NONE) + continue; + + try { + validator.validatePartitionCountersAndSizes(this, top, msgs); + } + catch (IgniteCheckedException ex) { + log.warning("Partition states validation was failed for cache " + grpDesc.cacheOrGroupName(), ex); + // TODO: Handle such errors https://issues.apache.org/jira/browse/IGNITE-7833 + } + } + } + + /** * */ private void assignPartitionsStates() { http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index 215152d..6ebafac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@ -17,9 +17,9 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; -import java.util.Collection; import java.io.Externalizable; import java.nio.ByteBuffer; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -67,6 +67,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes /** Serialized partitions counters. */ private byte[] partCntrsBytes; + /** Partitions sizes. */ + @GridToStringInclude + @GridDirectTransient + private Map<Integer, Map<Integer, Long>> partSizes; + + /** Serialized partitions counters. */ + private byte[] partSizesBytes; + /** Partitions history reservation counters. */ @GridToStringInclude @GridDirectTransient @@ -220,6 +228,35 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes } /** + * Adds partition sizes map for specified {@code grpId} to the current message. + * + * @param grpId Group id. + * @param partSizesMap Partition sizes map. + */ + public void addPartitionSizes(int grpId, Map<Integer, Long> partSizesMap) { + if (partSizesMap.isEmpty()) + return; + + if (partSizes == null) + partSizes = new HashMap<>(); + + partSizes.put(grpId, partSizesMap); + } + + /** + * Returns partition sizes map for specified {@code grpId}. + * + * @param grpId Group id. + * @return Partition sizes map (partId, partSize). + */ + public Map<Integer, Long> partitionSizes(int grpId) { + if (partSizes == null) + return Collections.emptyMap(); + + return partSizes.getOrDefault(grpId, Collections.emptyMap()); + } + + /** * @param grpId Cache group ID. * @param cntrMap Partition history counters. */ @@ -287,12 +324,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes boolean marshal = (parts != null && partsBytes == null) || (partCntrs != null && partCntrsBytes == null) || (partHistCntrs != null && partHistCntrsBytes == null) || + (partSizes != null && partSizesBytes == null) || (err != null && errBytes == null); if (marshal) { byte[] partsBytes0 = null; byte[] partCntrsBytes0 = null; byte[] partHistCntrsBytes0 = null; + byte[] partSizesBytes0 = null; byte[] errBytes0 = null; if (parts != null && partsBytes == null) @@ -304,6 +343,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes if (partHistCntrs != null && partHistCntrsBytes == null) partHistCntrsBytes0 = U.marshal(ctx, partHistCntrs); + if (partSizes != null && partSizesBytes == null) + partSizesBytes0 = U.marshal(ctx, partSizes); + if (err != null && errBytes == null) errBytes0 = U.marshal(ctx, err); @@ -314,11 +356,13 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes byte[] partsBytesZip = U.zip(partsBytes0); byte[] partCntrsBytesZip = U.zip(partCntrsBytes0); byte[] partHistCntrsBytesZip = U.zip(partHistCntrsBytes0); + byte[] partSizesBytesZip = U.zip(partSizesBytes0); byte[] exBytesZip = U.zip(errBytes0); partsBytes0 = partsBytesZip; partCntrsBytes0 = partCntrsBytesZip; partHistCntrsBytes0 = partHistCntrsBytesZip; + partSizesBytes0 = partSizesBytesZip; errBytes0 = exBytesZip; compressed(true); @@ -331,6 +375,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes partsBytes = partsBytes0; partCntrsBytes = partCntrsBytes0; partHistCntrsBytes = partHistCntrsBytes0; + partSizesBytes = partSizesBytes0; errBytes = errBytes0; } } @@ -360,6 +405,13 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes partHistCntrs = U.unmarshal(ctx, partHistCntrsBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); } + if (partSizesBytes != null && partSizes == null) { + if (compressed()) + partSizes = U.unmarshalZip(ctx.marshaller(), partSizesBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + else + partSizes = U.unmarshal(ctx, partSizesBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + } + if (errBytes != null && err == null) { if (compressed()) err = U.unmarshalZip(ctx.marshaller(), errBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); @@ -451,6 +503,11 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes writer.incrementState(); + case 13: + if (!writer.writeByteArray("partsSizesBytes", partSizesBytes)) + return false; + + writer.incrementState(); } return true; @@ -531,6 +588,13 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes reader.incrementState(); + case 13: + partSizesBytes = reader.readByteArray("partsSizesBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); } return reader.afterMessageRead(GridDhtPartitionsSingleMessage.class); @@ -543,7 +607,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 13; + return 14; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java index 596fa8c..42a9ba6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/InitNewCoordinatorFuture.java @@ -235,7 +235,7 @@ public class InitNewCoordinatorFuture extends GridCompoundFuture { if (awaited.remove(node.id())) { GridDhtPartitionsFullMessage fullMsg0 = msg.finishMessage(); - if (fullMsg0 != null) { + if (fullMsg0 != null && fullMsg0.resultTopologyVersion() != null) { assert fullMsg == null || fullMsg.resultTopologyVersion().equals(fullMsg0.resultTopologyVersion()); fullMsg = fullMsg0; http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java new file mode 100644 index 0000000..c205cb1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java @@ -0,0 +1,695 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.GridTopic; +import org.apache.ignite.internal.managers.communication.GridIoManager; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; +import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.util.GridConcurrentHashSet; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteProductVersion; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; + +/** + * Class is responsible to create and manage instances of distributed latches {@link Latch}. + */ +public class ExchangeLatchManager { + /** Version since latch management is available. */ + private static final IgniteProductVersion VERSION_SINCE = IgniteProductVersion.fromString("2.5.0"); + + /** Logger. */ + private final IgniteLogger log; + + /** Context. */ + private final GridKernalContext ctx; + + /** Discovery manager. */ + private final GridDiscoveryManager discovery; + + /** IO manager. */ + private final GridIoManager io; + + /** Current coordinator. */ + private volatile ClusterNode coordinator; + + /** Pending acks collection. */ + private final ConcurrentMap<T2<String, AffinityTopologyVersion>, Set<UUID>> pendingAcks = new ConcurrentHashMap<>(); + + /** Server latches collection. */ + private final ConcurrentMap<T2<String, AffinityTopologyVersion>, ServerLatch> serverLatches = new ConcurrentHashMap<>(); + + /** Client latches collection. */ + private final ConcurrentMap<T2<String, AffinityTopologyVersion>, ClientLatch> clientLatches = new ConcurrentHashMap<>(); + + /** Lock. */ + private final ReentrantLock lock = new ReentrantLock(); + + /** + * Constructor. + * + * @param ctx Kernal context. + */ + public ExchangeLatchManager(GridKernalContext ctx) { + this.ctx = ctx; + this.log = ctx.log(getClass()); + this.discovery = ctx.discovery(); + this.io = ctx.io(); + + if (!ctx.clientNode()) { + ctx.io().addMessageListener(GridTopic.TOPIC_EXCHANGE, (nodeId, msg, plc) -> { + if (msg instanceof LatchAckMessage) { + processAck(nodeId, (LatchAckMessage) msg); + } + }); + + // First coordinator initialization. + ctx.discovery().localJoinFuture().listen(f -> { + this.coordinator = getLatchCoordinator(AffinityTopologyVersion.NONE); + }); + + ctx.event().addDiscoveryEventListener((e, cache) -> { + assert e != null; + assert e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED : this; + + // Do not process from discovery thread. + ctx.closure().runLocalSafe(() -> processNodeLeft(e.eventNode())); + }, EVT_NODE_LEFT, EVT_NODE_FAILED); + } + } + + /** + * Creates server latch with given {@code id} and {@code topVer}. + * Adds corresponding pending acks to it. + * + * @param id Latch id. + * @param topVer Latch topology version. + * @param participants Participant nodes. + * @return Server latch instance. + */ + private Latch createServerLatch(String id, AffinityTopologyVersion topVer, Collection<ClusterNode> participants) { + final T2<String, AffinityTopologyVersion> latchId = new T2<>(id, topVer); + + if (serverLatches.containsKey(latchId)) + return serverLatches.get(latchId); + + ServerLatch latch = new ServerLatch(id, topVer, participants); + + serverLatches.put(latchId, latch); + + if (log.isDebugEnabled()) + log.debug("Server latch is created [latch=" + latchId + ", participantsSize=" + participants.size() + "]"); + + if (pendingAcks.containsKey(latchId)) { + Set<UUID> acks = pendingAcks.get(latchId); + + for (UUID node : acks) + if (latch.hasParticipant(node) && !latch.hasAck(node)) + latch.ack(node); + + pendingAcks.remove(latchId); + } + + if (latch.isCompleted()) + serverLatches.remove(latchId); + + return latch; + } + + /** + * Creates client latch. + * If there is final ack corresponds to given {@code id} and {@code topVer}, latch will be completed immediately. + * + * @param id Latch id. + * @param topVer Latch topology version. + * @param coordinator Coordinator node. + * @param participants Participant nodes. + * @return Client latch instance. + */ + private Latch createClientLatch(String id, AffinityTopologyVersion topVer, ClusterNode coordinator, Collection<ClusterNode> participants) { + final T2<String, AffinityTopologyVersion> latchId = new T2<>(id, topVer); + + if (clientLatches.containsKey(latchId)) + return clientLatches.get(latchId); + + ClientLatch latch = new ClientLatch(id, topVer, coordinator, participants); + + if (log.isDebugEnabled()) + log.debug("Client latch is created [latch=" + latchId + + ", crd=" + coordinator + + ", participantsSize=" + participants.size() + "]"); + + // There is final ack for created latch. + if (pendingAcks.containsKey(latchId)) { + latch.complete(); + pendingAcks.remove(latchId); + } + else + clientLatches.put(latchId, latch); + + return latch; + } + + /** + * Creates new latch with specified {@code id} and {@code topVer} or returns existing latch. + * + * Participants of latch are calculated from given {@code topVer} as alive server nodes. + * If local node is coordinator {@code ServerLatch} instance will be created, otherwise {@code ClientLatch} instance. + * + * @param id Latch id. + * @param topVer Latch topology version. + * @return Latch instance. + */ + public Latch getOrCreate(String id, AffinityTopologyVersion topVer) { + lock.lock(); + + try { + ClusterNode coordinator = getLatchCoordinator(topVer); + + if (coordinator == null) { + ClientLatch latch = new ClientLatch(id, AffinityTopologyVersion.NONE, null, Collections.emptyList()); + latch.complete(); + + return latch; + } + + Collection<ClusterNode> participants = getLatchParticipants(topVer); + + return coordinator.isLocal() + ? createServerLatch(id, topVer, participants) + : createClientLatch(id, topVer, coordinator, participants); + } + finally { + lock.unlock(); + } + } + + /** + * @param topVer Latch topology version. + * @return Collection of alive server nodes with latch functionality. + */ + private Collection<ClusterNode> getLatchParticipants(AffinityTopologyVersion topVer) { + Collection<ClusterNode> aliveNodes = topVer == AffinityTopologyVersion.NONE + ? discovery.aliveServerNodes() + : discovery.discoCache(topVer).aliveServerNodes(); + + return aliveNodes + .stream() + .filter(node -> node.version().compareTo(VERSION_SINCE) >= 0) + .collect(Collectors.toList()); + } + + /** + * @param topVer Latch topology version. + * @return Oldest alive server node with latch functionality. + */ + @Nullable private ClusterNode getLatchCoordinator(AffinityTopologyVersion topVer) { + Collection<ClusterNode> aliveNodes = topVer == AffinityTopologyVersion.NONE + ? discovery.aliveServerNodes() + : discovery.discoCache(topVer).aliveServerNodes(); + + return aliveNodes + .stream() + .filter(node -> node.version().compareTo(VERSION_SINCE) >= 0) + .findFirst() + .orElse(null); + } + + /** + * Processes ack message from given {@code from} node. + * + * Completes client latch in case of final ack message. + * + * If no latch is associated with message, ack is placed to {@link #pendingAcks} set. + * + * @param from Node sent ack. + * @param message Ack message. + */ + private void processAck(UUID from, LatchAckMessage message) { + lock.lock(); + + try { + ClusterNode coordinator = getLatchCoordinator(AffinityTopologyVersion.NONE); + + if (coordinator == null) + return; + + T2<String, AffinityTopologyVersion> latchId = new T2<>(message.latchId(), message.topVer()); + + if (message.isFinal()) { + if (log.isDebugEnabled()) + log.debug("Process final ack [latch=" + latchId + ", from=" + from + "]"); + + if (clientLatches.containsKey(latchId)) { + ClientLatch latch = clientLatches.remove(latchId); + latch.complete(); + } + else if (!coordinator.isLocal()) { + pendingAcks.computeIfAbsent(latchId, (id) -> new GridConcurrentHashSet<>()); + pendingAcks.get(latchId).add(from); + } + } else { + if (log.isDebugEnabled()) + log.debug("Process ack [latch=" + latchId + ", from=" + from + "]"); + + if (serverLatches.containsKey(latchId)) { + ServerLatch latch = serverLatches.get(latchId); + + if (latch.hasParticipant(from) && !latch.hasAck(from)) { + latch.ack(from); + + if (latch.isCompleted()) + serverLatches.remove(latchId); + } + } + else { + pendingAcks.computeIfAbsent(latchId, (id) -> new GridConcurrentHashSet<>()); + pendingAcks.get(latchId).add(from); + } + } + } + finally { + lock.unlock(); + } + } + + /** + * Changes coordinator to current local node. + * Restores all server latches from pending acks and own client latches. + */ + private void becomeNewCoordinator() { + if (log.isInfoEnabled()) + log.info("Become new coordinator " + coordinator.id()); + + List<T2<String, AffinityTopologyVersion>> latchesToRestore = new ArrayList<>(); + latchesToRestore.addAll(pendingAcks.keySet()); + latchesToRestore.addAll(clientLatches.keySet()); + + for (T2<String, AffinityTopologyVersion> latchId : latchesToRestore) { + String id = latchId.get1(); + AffinityTopologyVersion topVer = latchId.get2(); + Collection<ClusterNode> participants = getLatchParticipants(topVer); + + if (!participants.isEmpty()) + createServerLatch(id, topVer, participants); + } + } + + /** + * Handles node left discovery event. + * + * Summary: + * Removes pending acks corresponds to the left node. + * Adds fake acknowledgements to server latches where such node was participant. + * Changes client latches coordinator to oldest available server node where such node was coordinator. + * Detects coordinator change. + * + * @param left Left node. + */ + private void processNodeLeft(ClusterNode left) { + assert this.coordinator != null : "Coordinator is not initialized"; + + lock.lock(); + + try { + if (log.isDebugEnabled()) + log.debug("Process node left " + left.id()); + + ClusterNode coordinator = getLatchCoordinator(AffinityTopologyVersion.NONE); + + if (coordinator == null) + return; + + // Clear pending acks. + for (Map.Entry<T2<String, AffinityTopologyVersion>, Set<UUID>> ackEntry : pendingAcks.entrySet()) + if (ackEntry.getValue().contains(left.id())) + pendingAcks.get(ackEntry.getKey()).remove(left.id()); + + // Change coordinator for client latches. + for (Map.Entry<T2<String, AffinityTopologyVersion>, ClientLatch> latchEntry : clientLatches.entrySet()) { + ClientLatch latch = latchEntry.getValue(); + if (latch.hasCoordinator(left.id())) { + // Change coordinator for latch and re-send ack if necessary. + if (latch.hasParticipant(coordinator.id())) + latch.newCoordinator(coordinator); + else { + /* If new coordinator is not able to take control on the latch, + it means that all other latch participants are left from topology + and there is no reason to track such latch. */ + AffinityTopologyVersion topVer = latchEntry.getKey().get2(); + + assert getLatchParticipants(topVer).isEmpty(); + + latch.complete(new IgniteCheckedException("All latch participants are left from topology.")); + clientLatches.remove(latchEntry.getKey()); + } + } + } + + // Add acknowledgements from left node. + for (Map.Entry<T2<String, AffinityTopologyVersion>, ServerLatch> latchEntry : serverLatches.entrySet()) { + ServerLatch latch = latchEntry.getValue(); + + if (latch.hasParticipant(left.id()) && !latch.hasAck(left.id())) { + if (log.isDebugEnabled()) + log.debug("Process node left [latch=" + latchEntry.getKey() + ", left=" + left.id() + "]"); + + latch.ack(left.id()); + + if (latch.isCompleted()) + serverLatches.remove(latchEntry.getKey()); + } + } + + // Coordinator is changed. + if (coordinator.isLocal() && this.coordinator.id() != coordinator.id()) { + this.coordinator = coordinator; + + becomeNewCoordinator(); + } + } + finally { + lock.unlock(); + } + } + + /** + * Latch creating on coordinator node. + * Latch collects acks from participants: non-coordinator nodes and current local node. + * Latch completes when all acks from all participants are received. + * + * After latch completion final ack is sent to all participants. + */ + class ServerLatch extends CompletableLatch { + /** Number of latch permits. This is needed to track number of countDown invocations. */ + private final AtomicInteger permits; + + /** Set of received acks. */ + private final Set<UUID> acks = new GridConcurrentHashSet<>(); + + /** + * Constructor. + * + * @param id Latch id. + * @param topVer Latch topology version. + * @param participants Participant nodes. + */ + ServerLatch(String id, AffinityTopologyVersion topVer, Collection<ClusterNode> participants) { + super(id, topVer, participants); + this.permits = new AtomicInteger(participants.size()); + + // Send final acks when latch is completed. + this.complete.listen(f -> { + for (ClusterNode node : participants) { + try { + if (discovery.alive(node)) { + io.sendToGridTopic(node, GridTopic.TOPIC_EXCHANGE, new LatchAckMessage(id, topVer, true), GridIoPolicy.SYSTEM_POOL); + + if (log.isDebugEnabled()) + log.debug("Final ack is ackSent [latch=" + latchId() + ", to=" + node.id() + "]"); + } + } catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Unable to send final ack [latch=" + latchId() + ", to=" + node.id() + "]"); + } + } + }); + } + + /** + * Checks if latch has ack from given node. + * + * @param from Node. + * @return {@code true} if latch has ack from given node. + */ + private boolean hasAck(UUID from) { + return acks.contains(from); + } + + /** + * Receives ack from given node. + * Count downs latch if ack was not already processed. + * + * @param from Node. + */ + private void ack(UUID from) { + if (log.isDebugEnabled()) + log.debug("Ack is accepted [latch=" + latchId() + ", from=" + from + "]"); + + countDown0(from); + } + + /** + * Count down latch from ack of given node. + * Completes latch if all acks are received. + * + * @param node Node. + */ + private void countDown0(UUID node) { + if (isCompleted() || acks.contains(node)) + return; + + acks.add(node); + + int remaining = permits.decrementAndGet(); + + if (log.isDebugEnabled()) + log.debug("Count down + [latch=" + latchId() + ", remaining=" + remaining + "]"); + + if (remaining == 0) + complete(); + } + + /** {@inheritDoc} */ + @Override public void countDown() { + countDown0(ctx.localNodeId()); + } + + /** {@inheritDoc} */ + @Override public String toString() { + Set<UUID> pendingAcks = participants.stream().filter(ack -> !acks.contains(ack)).collect(Collectors.toSet()); + + return S.toString(ServerLatch.class, this, + "pendingAcks", pendingAcks, + "super", super.toString()); + } + } + + /** + * Latch creating on non-coordinator node. + * Latch completes when final ack from coordinator is received. + */ + class ClientLatch extends CompletableLatch { + /** Latch coordinator node. Can be changed if coordinator is left from topology. */ + private volatile ClusterNode coordinator; + + /** Flag indicates that ack is sent to coordinator. */ + private boolean ackSent; + + /** + * Constructor. + * + * @param id Latch id. + * @param topVer Latch topology version. + * @param coordinator Coordinator node. + * @param participants Participant nodes. + */ + ClientLatch(String id, AffinityTopologyVersion topVer, ClusterNode coordinator, Collection<ClusterNode> participants) { + super(id, topVer, participants); + + this.coordinator = coordinator; + } + + /** + * Checks if latch coordinator is given {@code node}. + * + * @param node Node. + * @return {@code true} if latch coordinator is given node. + */ + private boolean hasCoordinator(UUID node) { + return coordinator.id().equals(node); + } + + /** + * Changes coordinator of latch and resends ack to new coordinator if needed. + * + * @param coordinator New coordinator. + */ + private void newCoordinator(ClusterNode coordinator) { + if (log.isDebugEnabled()) + log.debug("Coordinator is changed [latch=" + latchId() + ", crd=" + coordinator.id() + "]"); + + synchronized (this) { + this.coordinator = coordinator; + + // Resend ack to new coordinator. + if (ackSent) + sendAck(); + } + } + + /** + * Sends ack to coordinator node. + * There is ack deduplication on coordinator. So it's fine to send same ack twice. + */ + private void sendAck() { + try { + ackSent = true; + + io.sendToGridTopic(coordinator, GridTopic.TOPIC_EXCHANGE, new LatchAckMessage(id, topVer, false), GridIoPolicy.SYSTEM_POOL); + + if (log.isDebugEnabled()) + log.debug("Ack is ackSent + [latch=" + latchId() + ", to=" + coordinator.id() + "]"); + } catch (IgniteCheckedException e) { + // Coordinator is unreachable. On coodinator node left discovery event ack will be resent. + if (log.isDebugEnabled()) + log.debug("Unable to send ack [latch=" + latchId() + ", to=" + coordinator.id() + "]: " + e.getMessage()); + } + } + + /** {@inheritDoc} */ + @Override public void countDown() { + if (isCompleted()) + return; + + // Synchronize in case of changed coordinator. + synchronized (this) { + sendAck(); + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ClientLatch.class, this, + "super", super.toString()); + } + } + + /** + * Base latch functionality with implemented complete / await logic. + */ + private abstract static class CompletableLatch implements Latch { + /** Latch id. */ + @GridToStringInclude + protected final String id; + + /** Latch topology version. */ + @GridToStringInclude + protected final AffinityTopologyVersion topVer; + + /** Latch node participants. Only participant nodes are able to change state of latch. */ + @GridToStringExclude + protected final Set<UUID> participants; + + /** Future indicates that latch is completed. */ + @GridToStringExclude + protected final GridFutureAdapter<?> complete = new GridFutureAdapter<>(); + + /** + * Constructor. + * + * @param id Latch id. + * @param topVer Latch topology version. + * @param participants Participant nodes. + */ + CompletableLatch(String id, AffinityTopologyVersion topVer, Collection<ClusterNode> participants) { + this.id = id; + this.topVer = topVer; + this.participants = participants.stream().map(ClusterNode::id).collect(Collectors.toSet()); + } + + /** {@inheritDoc} */ + @Override public void await() throws IgniteCheckedException { + complete.get(); + } + + /** {@inheritDoc} */ + @Override public void await(long timeout, TimeUnit timeUnit) throws IgniteCheckedException { + complete.get(timeout, timeUnit); + } + + /** + * Checks if latch participants contain given {@code node}. + * + * @param node Node. + * @return {@code true} if latch participants contain given node. + */ + boolean hasParticipant(UUID node) { + return participants.contains(node); + } + + /** + * @return {@code true} if latch is completed. + */ + boolean isCompleted() { + return complete.isDone(); + } + + /** + * Completes current latch. + */ + void complete() { + complete.onDone(); + } + + /** + * Completes current latch with given {@code error}. + * + * @param error Error. + */ + void complete(Throwable error) { + complete.onDone(error); + } + + /** + * @return Full latch id. + */ + String latchId() { + return id + "-" + topVer; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CompletableLatch.class, this); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/da77b981/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/Latch.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/Latch.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/Latch.java new file mode 100644 index 0000000..9704c2e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/Latch.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch; + +import java.util.concurrent.TimeUnit; +import org.apache.ignite.IgniteCheckedException; + +/** + * Simple distributed count down latch interface. + * Latch supports count down and await logic. + * Latch functionality is not relied on caches and has own state management {@link ExchangeLatchManager}. + */ +public interface Latch { + /** + * Decrements count on current latch. + * Release all latch waiters on all nodes if count reaches zero. + * + * This is idempotent operation. Invoking this method twice or more on the same node doesn't have any effect. + */ + void countDown(); + + /** + * Awaits current latch completion. + * + * @throws IgniteCheckedException If await is failed. + */ + void await() throws IgniteCheckedException; + + /** + * Awaits current latch completion with specified timeout. + * + * @param timeout Timeout value. + * @param timeUnit Timeout time unit. + * @throws IgniteCheckedException If await is failed. + */ + void await(long timeout, TimeUnit timeUnit) throws IgniteCheckedException; +}