Merge remote-tracking branch 'remotes/origin/master' into ignite-2.1.2 # Conflicts: # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f0f59631 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f0f59631 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f0f59631 Branch: refs/heads/ignite-2.1.2-exchange Commit: f0f59631ec05a281af671f6cc246ca3ef443e083 Parents: 4adfca9 8445b31 Author: sboikov <[email protected]> Authored: Wed Jun 28 09:06:23 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed Jun 28 09:06:23 2017 +0300 ---------------------------------------------------------------------- .../dht/GridClientPartitionTopology.java | 2 +- .../dht/GridDhtPartitionTopologyImpl.java | 153 ++++---- .../GridDhtPartitionsExchangeFuture.java | 24 +- .../DynamicIndexAbstractBasicSelfTest.java | 81 ++++- ...exingComplexClientAtomicPartitionedTest.java | 33 ++ ...dexingComplexClientAtomicReplicatedTest.java | 33 ++ ...mplexClientTransactionalPartitionedTest.java | 33 ++ ...omplexClientTransactionalReplicatedTest.java | 33 ++ ...exingComplexServerAtomicPartitionedTest.java | 33 ++ ...dexingComplexServerAtomicReplicatedTest.java | 33 ++ ...mplexServerTransactionalPartitionedTest.java | 33 ++ ...omplexServerTransactionalReplicatedTest.java | 33 ++ .../index/H2DynamicIndexingComplexTest.java | 356 +++++++++++++++++++ .../cache/index/H2DynamicTableSelfTest.java | 182 ++++++++-- .../IgniteCacheQuerySelfTestSuite.java | 30 +- 15 files changed, 961 insertions(+), 131 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f0f59631/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f0f59631/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index ce19c6b,bb6aab3..a6f1831 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@@ -59,7 -60,9 +59,8 @@@ import org.apache.ignite.internal.util. import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST; + import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.LOST; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING; @@@ -299,91 -383,93 +300,93 @@@ public class GridDhtPartitionTopologyIm * @param updateSeq Update sequence. */ private void initPartitions0(GridDhtPartitionsExchangeFuture exchFut, long updateSeq) { - ClusterNode loc = ctx.localNode(); + List<List<ClusterNode>> aff = grp.affinity().assignments(exchFut.topologyVersion()); - ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); + if (grp.affinityNode()) { + ClusterNode loc = ctx.localNode(); - GridDhtPartitionExchangeId exchId = exchFut.exchangeId(); + ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); - assert topVer.equals(exchFut.topologyVersion()) : - "Invalid topology [topVer=" + topVer + - ", grp=" + grp.cacheOrGroupName() + - ", futVer=" + exchFut.topologyVersion() + - ", fut=" + exchFut + ']'; - assert grp.affinity().lastVersion().equals(exchFut.topologyVersion()) : - "Invalid affinity [topVer=" + grp.affinity().lastVersion() + - ", grp=" + grp.cacheOrGroupName() + - ", futVer=" + exchFut.topologyVersion() + - ", fut=" + exchFut + ']'; + GridDhtPartitionExchangeId exchId = exchFut.exchangeId(); - List<List<ClusterNode>> aff = grp.affinity().assignments(exchFut.topologyVersion()); + assert topVer.equals(exchFut.topologyVersion()) : + "Invalid topology [topVer=" + topVer + + ", grp=" + grp.cacheOrGroupName() + + ", futVer=" + exchFut.topologyVersion() + + ", fut=" + exchFut + ']'; + assert grp.affinity().lastVersion().equals(exchFut.topologyVersion()) : + "Invalid affinity [topVer=" + grp.affinity().lastVersion() + + ", grp=" + grp.cacheOrGroupName() + + ", futVer=" + exchFut.topologyVersion() + + ", fut=" + exchFut + ']'; - int num = grp.affinity().partitions(); + int num = grp.affinity().partitions(); - if (grp.rebalanceEnabled()) { - boolean added = exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom()); + if (grp.rebalanceEnabled()) { + boolean added = exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom()); - boolean first = added || (loc.equals(oldest) && loc.id().equals(exchId.nodeId()) && exchId.isJoined()); + boolean first = added || (loc.equals(oldest) && loc.id().equals(exchId.nodeId()) && exchId.isJoined()); - if (first) { - assert exchId.isJoined() || added; + if (first) { + assert exchId.isJoined() || added; - for (int p = 0; p < num; p++) { - if (localNode(p, aff)) { - GridDhtLocalPartition locPart = createPartition(p); + for (int p = 0; p < num; p++) { + if (localNode(p, aff)) { + GridDhtLocalPartition locPart = createPartition(p); - boolean owned = locPart.own(); + boolean owned = locPart.own(); - assert owned : "Failed to own partition for oldest node [grp=" + grp.cacheOrGroupName() + - ", part=" + locPart + ']'; + assert owned : "Failed to own partition for oldest node [grp=" + grp.cacheOrGroupName() + + ", part=" + locPart + ']'; - if (log.isDebugEnabled()) - log.debug("Owned partition for oldest node: " + locPart); + if (log.isDebugEnabled()) + log.debug("Owned partition for oldest node: " + locPart); - updateSeq = updateLocal(p, locPart.state(), updateSeq); + updateSeq = updateLocal(p, locPart.state(), updateSeq); + } } } + else + createPartitions(aff, updateSeq); } - else - createPartitions(aff, updateSeq); - } - else { - // If preloader is disabled, then we simply clear out - // the partitions this node is not responsible for. - for (int p = 0; p < num; p++) { - GridDhtLocalPartition locPart = localPartition0(p, topVer, false, true, false); + else { + // If preloader is disabled, then we simply clear out + // the partitions this node is not responsible for. + for (int p = 0; p < num; p++) { - GridDhtLocalPartition locPart = localPartition(p, topVer, false, false); ++ GridDhtLocalPartition locPart = localPartition0(p, topVer, false, true, false); - boolean belongs = localNode(p, aff); + boolean belongs = localNode(p, aff); - if (locPart != null) { - if (!belongs) { - GridDhtPartitionState state = locPart.state(); + if (locPart != null) { + if (!belongs) { + GridDhtPartitionState state = locPart.state(); - if (state.active()) { - locPart.rent(false); + if (state.active()) { + locPart.rent(false); - updateSeq = updateLocal(p, locPart.state(), updateSeq); + updateSeq = updateLocal(p, locPart.state(), updateSeq); - if (log.isDebugEnabled()) - log.debug("Evicting partition with rebalancing disabled " + - "(it does not belong to affinity): " + locPart); + if (log.isDebugEnabled()) + log.debug("Evicting partition with rebalancing disabled " + + "(it does not belong to affinity): " + locPart); + } } + else + locPart.own(); } - else - locPart.own(); - } - else if (belongs) { - locPart = createPartition(p); + else if (belongs) { + locPart = createPartition(p); - locPart.own(); + locPart.own(); - updateLocal(p, locPart.state(), updateSeq); + updateLocal(p, locPart.state(), updateSeq); + } } } - } - if (node2part != null && node2part.valid()) - checkEvictions(updateSeq, aff); + if (node2part != null && node2part.valid()) + checkEvictions(updateSeq, aff); + } updateRebalanceVersion(aff); } @@@ -430,81 -522,94 +436,84 @@@ ctx.database().checkpointReadLock(); - synchronized (ctx.exchange().interruptLock()) { - if (Thread.currentThread().isInterrupted()) - throw new IgniteInterruptedCheckedException("Thread is interrupted: " + Thread.currentThread()); + try { + synchronized (ctx.exchange().interruptLock()) { + if (Thread.currentThread().isInterrupted()) + throw new IgniteInterruptedCheckedException("Thread is interrupted: " + Thread.currentThread()); - try { U.writeLock(lock); - } - catch (IgniteInterruptedCheckedException e) { - ctx.database().checkpointReadUnlock(); - - throw e; - } - try { - GridDhtPartitionExchangeId exchId = exchFut.exchangeId(); + try { + - if (stopping) - return; + - assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" + - topVer + ", exchId=" + exchId + ']'; + if (stopping) + return; - if (exchId.isLeft()) - GridDhtPartitionExchangeId exchId = exchFut.exchangeId(); - - assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" + ++ GridDhtPartitionExchangeId exchId = exchFut.exchangeId();assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" + + topVer + ", exchId=" + exchId + ']'; + - if (exchId.isLeft() && exchFut.serverNodeDiscoveryEvent()) - removeNode(exchId.nodeId()); ++ if (exchId.isLeft() && exchFut.serverNodeDiscoveryEvent()) + removeNode(exchId.nodeId()); - ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); + ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); - if (log.isDebugEnabled()) - log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']'); + if (log.isDebugEnabled()) + log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']'); - long updateSeq = this.updateSeq.incrementAndGet(); + long updateSeq = this.updateSeq.incrementAndGet(); - cntrMap.clear(); + cntrMap.clear(); - // If this is the oldest node. - if (oldest != null && (loc.equals(oldest) || exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom()))) { - if (node2part == null) { - node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq); - boolean grpStarted = exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom()); - - // If this is the oldest node. ++ boolean grpStarted = exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom());// If this is the oldest node. + if (oldest != null && (loc.equals(oldest) || grpStarted)) { + if (node2part == null) { + node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq); - if (log.isDebugEnabled()) - log.debug("Created brand new full topology map on oldest node [exchId=" + - exchId + ", fullMap=" + fullMapString() + ']'); - } - else if (!node2part.valid()) { - node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false); + if (log.isDebugEnabled()) + log.debug("Created brand new full topology map on oldest node [exchId=" + + exchId + ", fullMap=" + fullMapString() + ']'); + } + else if (!node2part.valid()) { + node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false); - if (log.isDebugEnabled()) - log.debug("Created new full topology map on oldest node [exchId=" + exchId + ", fullMap=" + - node2part + ']'); - } - else if (!node2part.nodeId().equals(loc.id())) { - node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false); + if (log.isDebugEnabled()) + log.debug("Created new full topology map on oldest node [exchId=" + exchId + ", fullMap=" + + node2part + ']'); + } + else if (!node2part.nodeId().equals(loc.id())) { + node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false); - if (log.isDebugEnabled()) - log.debug("Copied old map into new map on oldest node (previous oldest node left) [exchId=" + - exchId + ", fullMap=" + fullMapString() + ']'); + if (log.isDebugEnabled()) + log.debug("Copied old map into new map on oldest node (previous oldest node left) [exchId=" + + exchId + ", fullMap=" + fullMapString() + ']'); + } } - } - if (affReady) - initPartitions0(exchFut, updateSeq); - else { - List<List<ClusterNode>> aff = grp.affinity().idealAssignment(); + if (grpStarted || + exchFut.discoveryEvent().type() == EVT_DISCOVERY_CUSTOM_EVT || - exchFut.serverNodeDiscoveryEvent()) { - if (affReady) - initPartitions0(exchFut, updateSeq); - else { - List<List<ClusterNode>> aff = grp.affinity().idealAssignment(); ++ exchFut.serverNodeDiscoveryEvent()) {if (affReady) ++ initPartitions0(exchFut, updateSeq); ++ else { ++ List<List<ClusterNode>> aff = grp.affinity().idealAssignment(); createPartitions(aff, updateSeq); } + } - consistencyCheck(); - - if (log.isDebugEnabled()) - log.debug("Partition map after beforeExchange [exchId=" + exchId + ", fullMap=" + - fullMapString() + ']'); - } - finally { - lock.writeLock().unlock(); + consistencyCheck(); - ctx.database().checkpointReadUnlock(); + if (log.isDebugEnabled()) + log.debug("Partition map after beforeExchange [exchId=" + exchId + ", fullMap=" + + fullMapString() + ']'); + } + finally { + lock.writeLock().unlock(); + } } } - - // Wait for evictions. - waitForRent(); + finally { + ctx.database().checkpointReadUnlock(); + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/f0f59631/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f0f59631/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractBasicSelfTest.java ----------------------------------------------------------------------
