ignite-5595 Client event exchange optimization
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8445b315 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8445b315 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8445b315 Branch: refs/heads/ignite-2.1.2-exchange Commit: 8445b315663710507e6e3996223f01748f9674a6 Parents: ff151e5 Author: sboikov <[email protected]> Authored: Wed Jun 28 08:58:09 2017 +0300 Committer: sboikov <[email protected]> Committed: Wed Jun 28 08:58:09 2017 +0300 ---------------------------------------------------------------------- .../dht/GridClientPartitionTopology.java | 2 +- .../dht/GridDhtPartitionTopologyImpl.java | 147 ++++++++++--------- .../GridDhtPartitionsExchangeFuture.java | 24 +-- 3 files changed, 95 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8445b315/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java index 9c769a9..20a7f91 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java @@ -269,7 +269,7 @@ public class GridClientPartitionTopology implements GridDhtPartitionTopology { assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" + topVer + ", exchId=" + exchId + ']'; - if (!exchId.isJoined()) + if (exchId.isLeft() && exchFut.serverNodeDiscoveryEvent()) removeNode(exchId.nodeId()); // In case if node joins, get topology at the time of joining node. http://git-wip-us.apache.org/repos/asf/ignite/blob/8445b315/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index f3170fa..bb6aab3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -62,6 +62,7 @@ import org.jetbrains.annotations.Nullable; import static org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST; +import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.LOST; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING; @@ -208,6 +209,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { * @throws IgniteCheckedException If failed. */ private boolean waitForRent() throws IgniteCheckedException { + if (!grp.affinityNode()) + return false; + final long longOpDumpTimeout = IgniteSystemProperties.getLong(IgniteSystemProperties.IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT, 60_000); @@ -379,91 +383,93 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { * @param updateSeq Update sequence. */ private void initPartitions0(GridDhtPartitionsExchangeFuture exchFut, long updateSeq) { - ClusterNode loc = ctx.localNode(); + List<List<ClusterNode>> aff = grp.affinity().assignments(exchFut.topologyVersion()); - ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); + if (grp.affinityNode()) { + ClusterNode loc = ctx.localNode(); - GridDhtPartitionExchangeId exchId = exchFut.exchangeId(); + ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); - assert topVer.equals(exchFut.topologyVersion()) : - "Invalid topology [topVer=" + topVer + - ", grp=" + grp.cacheOrGroupName() + - ", futVer=" + exchFut.topologyVersion() + - ", fut=" + exchFut + ']'; - assert grp.affinity().lastVersion().equals(exchFut.topologyVersion()) : - "Invalid affinity [topVer=" + grp.affinity().lastVersion() + - ", grp=" + grp.cacheOrGroupName() + - ", futVer=" + exchFut.topologyVersion() + - ", fut=" + exchFut + ']'; + GridDhtPartitionExchangeId exchId = exchFut.exchangeId(); - List<List<ClusterNode>> aff = grp.affinity().assignments(exchFut.topologyVersion()); + assert topVer.equals(exchFut.topologyVersion()) : + "Invalid topology [topVer=" + topVer + + ", grp=" + grp.cacheOrGroupName() + + ", futVer=" + exchFut.topologyVersion() + + ", fut=" + exchFut + ']'; + assert grp.affinity().lastVersion().equals(exchFut.topologyVersion()) : + "Invalid affinity [topVer=" + grp.affinity().lastVersion() + + ", grp=" + grp.cacheOrGroupName() + + ", futVer=" + exchFut.topologyVersion() + + ", fut=" + exchFut + ']'; - int num = grp.affinity().partitions(); + int num = grp.affinity().partitions(); - if (grp.rebalanceEnabled()) { - boolean added = exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom()); + if (grp.rebalanceEnabled()) { + boolean added = exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom()); - boolean first = added || (loc.equals(oldest) && loc.id().equals(exchId.nodeId()) && exchId.isJoined()); + boolean first = added || (loc.equals(oldest) && loc.id().equals(exchId.nodeId()) && exchId.isJoined()); - if (first) { - assert exchId.isJoined() || added; + if (first) { + assert exchId.isJoined() || added; - for (int p = 0; p < num; p++) { - if (localNode(p, aff)) { - GridDhtLocalPartition locPart = createPartition(p); + for (int p = 0; p < num; p++) { + if (localNode(p, aff)) { + GridDhtLocalPartition locPart = createPartition(p); - boolean owned = locPart.own(); + boolean owned = locPart.own(); - assert owned : "Failed to own partition for oldest node [grp=" + grp.cacheOrGroupName() + - ", part=" + locPart + ']'; + assert owned : "Failed to own partition for oldest node [grp=" + grp.cacheOrGroupName() + + ", part=" + locPart + ']'; - if (log.isDebugEnabled()) - log.debug("Owned partition for oldest node: " + locPart); + if (log.isDebugEnabled()) + log.debug("Owned partition for oldest node: " + locPart); - updateSeq = updateLocal(p, locPart.state(), updateSeq); + updateSeq = updateLocal(p, locPart.state(), updateSeq); + } } } + else + createPartitions(aff, updateSeq); } - else - createPartitions(aff, updateSeq); - } - else { - // If preloader is disabled, then we simply clear out - // the partitions this node is not responsible for. - for (int p = 0; p < num; p++) { - GridDhtLocalPartition locPart = localPartition(p, topVer, false, false); + else { + // If preloader is disabled, then we simply clear out + // the partitions this node is not responsible for. + for (int p = 0; p < num; p++) { + GridDhtLocalPartition locPart = localPartition(p, topVer, false, false); - boolean belongs = localNode(p, aff); + boolean belongs = localNode(p, aff); - if (locPart != null) { - if (!belongs) { - GridDhtPartitionState state = locPart.state(); + if (locPart != null) { + if (!belongs) { + GridDhtPartitionState state = locPart.state(); - if (state.active()) { - locPart.rent(false); + if (state.active()) { + locPart.rent(false); - updateSeq = updateLocal(p, locPart.state(), updateSeq); + updateSeq = updateLocal(p, locPart.state(), updateSeq); - if (log.isDebugEnabled()) - log.debug("Evicting partition with rebalancing disabled " + - "(it does not belong to affinity): " + locPart); + if (log.isDebugEnabled()) + log.debug("Evicting partition with rebalancing disabled " + + "(it does not belong to affinity): " + locPart); + } } + else + locPart.own(); } - else - locPart.own(); - } - else if (belongs) { - locPart = createPartition(p); + else if (belongs) { + locPart = createPartition(p); - locPart.own(); + locPart.own(); - updateLocal(p, locPart.state(), updateSeq); + updateLocal(p, locPart.state(), updateSeq); + } } } - } - if (node2part != null && node2part.valid()) - checkEvictions(updateSeq, aff); + if (node2part != null && node2part.valid()) + checkEvictions(updateSeq, aff); + } updateRebalanceVersion(aff); } @@ -473,6 +479,9 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { * @param updateSeq Update sequence. */ private void createPartitions(List<List<ClusterNode>> aff, long updateSeq) { + if (!grp.affinityNode()) + return; + int num = grp.affinity().partitions(); for (int p = 0; p < num; p++) { @@ -527,15 +536,15 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } try { - GridDhtPartitionExchangeId exchId = exchFut.exchangeId(); - if (stopping) return; + GridDhtPartitionExchangeId exchId = exchFut.exchangeId(); + assert topVer.equals(exchId.topologyVersion()) : "Invalid topology version [topVer=" + topVer + ", exchId=" + exchId + ']'; - if (exchId.isLeft()) + if (exchId.isLeft() && exchFut.serverNodeDiscoveryEvent()) removeNode(exchId.nodeId()); ClusterNode oldest = discoCache.oldestAliveServerNodeWithCache(); @@ -547,8 +556,10 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { cntrMap.clear(); + boolean grpStarted = exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom()); + // If this is the oldest node. - if (oldest != null && (loc.equals(oldest) || exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom()))) { + if (oldest != null && (loc.equals(oldest) || grpStarted)) { if (node2part == null) { node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq); @@ -572,12 +583,16 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { } } - if (affReady) - initPartitions0(exchFut, updateSeq); - else { - List<List<ClusterNode>> aff = grp.affinity().idealAssignment(); + if (grpStarted || + exchFut.discoveryEvent().type() == EVT_DISCOVERY_CUSTOM_EVT || + exchFut.serverNodeDiscoveryEvent()) { + if (affReady) + initPartitions0(exchFut, updateSeq); + else { + List<List<ClusterNode>> aff = grp.affinity().idealAssignment(); - createPartitions(aff, updateSeq); + createPartitions(aff, updateSeq); + } } consistencyCheck(); http://git-wip-us.apache.org/repos/asf/ignite/blob/8445b315/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index c8138f7..1f13e56 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -1150,7 +1150,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** * @return {@code True} if exchange triggered by server node join or fail. */ - private boolean serverNodeDiscoveryEvent() { + public boolean serverNodeDiscoveryEvent() { assert discoEvt != null; return discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT && !CU.clientNode(discoEvt.eventNode()); @@ -1173,16 +1173,17 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } if (err == null && realExchange) { - for (CacheGroupContext grp : cctx.cache().cacheGroups()) { - if (grp.isLocal()) - continue; + if (centralizedAff) { + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + if (grp.isLocal()) + continue; - try { - if (centralizedAff) + try { grp.topology().initPartitions(this); - } - catch (IgniteInterruptedCheckedException e) { - U.error(log, "Failed to initialize partitions.", e); + } + catch (IgniteInterruptedCheckedException e) { + U.error(log, "Failed to initialize partitions.", e); + } } } @@ -1199,9 +1200,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } - if (discoEvt.type() == EVT_NODE_LEFT || + if (serverNodeDiscoveryEvent() && + (discoEvt.type() == EVT_NODE_LEFT || discoEvt.type() == EVT_NODE_FAILED || - discoEvt.type() == EVT_NODE_JOINED) + discoEvt.type() == EVT_NODE_JOINED)) detectLostPartitions(); Map<Integer, CacheValidation> m = U.newHashMap(cctx.cache().cacheGroups().size());
