http://git-wip-us.apache.org/repos/asf/ignite/blob/9f546f87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java index 9b099ae..835910e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionSupplier.java @@ -35,9 +35,9 @@ import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; import org.apache.ignite.internal.processors.cache.GridCacheMvccEntryInfo; import org.apache.ignite.internal.processors.cache.IgniteRebalanceIterator; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.mvcc.MvccUpdateVersionAware; import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils; import org.apache.ignite.internal.processors.cache.mvcc.MvccVersionAware; @@ -50,7 +50,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.spi.IgniteSpiException; -import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; +import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING; /** * Class for supplying partitions to demanding nodes. @@ -107,9 +107,7 @@ class GridDhtPartitionSupplier { * @param sc Supply context. * @param log Logger. */ - private static void clearContext( - final SupplyContext sc, - final IgniteLogger log) { + private static void clearContext(SupplyContext sc, IgniteLogger log) { if (sc != null) { final IgniteRebalanceIterator it = sc.iterator; @@ -144,7 +142,7 @@ class GridDhtPartitionSupplier { it.remove(); if (log.isDebugEnabled()) - log.debug("Supply context removed [node=" + t.get1() + "]"); + log.debug("Supply context removed [grp=" + grp.cacheOrGroupName() + ", demander=" + t.get1() + "]"); } } } @@ -169,36 +167,29 @@ class GridDhtPartitionSupplier { * * @param topicId Id of the topic is used for the supply-demand communication. * @param nodeId Id of the node which sent the demand message. - * @param d Demand message. + * @param demandMsg Demand message. */ @SuppressWarnings("unchecked") - public void handleDemandMessage(int topicId, UUID nodeId, GridDhtPartitionDemandMessage d) { - assert d != null; + public void handleDemandMessage(int topicId, UUID nodeId, GridDhtPartitionDemandMessage demandMsg) { + assert demandMsg != null; assert nodeId != null; - AffinityTopologyVersion curTop = grp.affinity().lastVersion(); - AffinityTopologyVersion demTop = d.topologyVersion(); + T3<UUID, Integer, AffinityTopologyVersion> contextId = new T3<>(nodeId, topicId, demandMsg.topologyVersion()); - T3<UUID, Integer, AffinityTopologyVersion> contextId = new T3<>(nodeId, topicId, demTop); - - if (d.rebalanceId() < 0) { // Demand node requested context cleanup. + if (demandMsg.rebalanceId() < 0) { // Demand node requested context cleanup. synchronized (scMap) { SupplyContext sctx = scMap.get(contextId); - if (sctx != null && sctx.rebalanceId == -d.rebalanceId()) { + if (sctx != null && sctx.rebalanceId == -demandMsg.rebalanceId()) { clearContext(scMap.remove(contextId), log); if (log.isDebugEnabled()) - log.debug("Supply context cleaned [grp=" + grp.cacheOrGroupName() - + ", from=" + nodeId - + ", demandMsg=" + d + log.debug("Supply context cleaned [" + supplyRoutineInfo(topicId, nodeId, demandMsg) + ", supplyContext=" + sctx + "]"); } else { if (log.isDebugEnabled()) - log.debug("Stale supply context cleanup message [grp=" + grp.cacheOrGroupName() - + ", from=" + nodeId - + ", demandMsg=" + d + log.debug("Stale supply context cleanup message [" + supplyRoutineInfo(topicId, nodeId, demandMsg) + ", supplyContext=" + sctx + "]"); } @@ -206,17 +197,15 @@ class GridDhtPartitionSupplier { } } - if (log.isDebugEnabled()) - log.debug("Demand request accepted [grp=" + grp.cacheOrGroupName() - + ", from=" + nodeId - + ", currentVer=" + curTop - + ", demandedVer=" + demTop - + ", topicId=" + topicId + "]"); + ClusterNode demanderNode = grp.shared().discovery().node(nodeId); - ClusterNode node = grp.shared().discovery().node(nodeId); + if (demanderNode == null) { + if (log.isDebugEnabled()) + log.debug("Demand message rejected (demander left cluster) [" + + supplyRoutineInfo(topicId, nodeId, demandMsg) + "]"); - if (node == null) return; + } IgniteRebalanceIterator iter = null; @@ -226,62 +215,59 @@ class GridDhtPartitionSupplier { synchronized (scMap) { sctx = scMap.remove(contextId); - if (sctx != null && d.rebalanceId() < sctx.rebalanceId) { + if (sctx != null && demandMsg.rebalanceId() < sctx.rebalanceId) { // Stale message, return context back and return. scMap.put(contextId, sctx); if (log.isDebugEnabled()) - log.debug("Stale demand message [cache=" + grp.cacheOrGroupName() - + ", actualContext=" + sctx - + ", from=" + nodeId - + ", demandMsg=" + d + "]"); + log.debug("Stale demand message [" + supplyRoutineInfo(topicId, nodeId, demandMsg) + + ", actualContext=" + sctx + "]"); return; } } // Demand request should not contain empty partitions if no supply context is associated with it. - if (sctx == null && (d.partitions() == null || d.partitions().isEmpty())) { + if (sctx == null && (demandMsg.partitions() == null || demandMsg.partitions().isEmpty())) { if (log.isDebugEnabled()) - log.debug("Empty demand message [cache=" + grp.cacheOrGroupName() - + ", from=" + nodeId - + ", topicId=" + topicId - + ", demandMsg=" + d + "]"); + log.debug("Empty demand message (no context and partitions) [" + + supplyRoutineInfo(topicId, nodeId, demandMsg) + "]"); return; } - assert !(sctx != null && !d.partitions().isEmpty()); + if (log.isDebugEnabled()) + log.debug("Demand message accepted [" + + supplyRoutineInfo(topicId, nodeId, demandMsg) + "]"); - long batchesCnt = 0; + assert !(sctx != null && !demandMsg.partitions().isEmpty()); long maxBatchesCnt = grp.config().getRebalanceBatchesPrefetchCount(); - if (sctx != null) { - maxBatchesCnt = 1; - } - else { + if (sctx == null) { if (log.isDebugEnabled()) - log.debug("Starting supplying rebalancing [cache=" + grp.cacheOrGroupName() + - ", fromNode=" + node.id() + ", partitionsCount=" + d.partitions().size() + - ", topology=" + demTop + ", rebalanceId=" + d.rebalanceId() + - ", topicId=" + topicId + "]"); + log.debug("Starting supplying rebalancing [" + supplyRoutineInfo(topicId, nodeId, demandMsg) + + ", fullPartitions=" + S.compact(demandMsg.partitions().fullSet()) + + ", histPartitions=" + S.compact(demandMsg.partitions().historicalSet()) + "]"); } + else + maxBatchesCnt = 1; - GridDhtPartitionSupplyMessage s = new GridDhtPartitionSupplyMessage( - d.rebalanceId(), + GridDhtPartitionSupplyMessage supplyMsg = new GridDhtPartitionSupplyMessage( + demandMsg.rebalanceId(), grp.groupId(), - d.topologyVersion(), - grp.deploymentEnabled()); + demandMsg.topologyVersion(), + grp.deploymentEnabled() + ); Set<Integer> remainingParts; if (sctx == null || sctx.iterator == null) { - iter = grp.offheap().rebalanceIterator(d.partitions(), d.topologyVersion()); + iter = grp.offheap().rebalanceIterator(demandMsg.partitions(), demandMsg.topologyVersion()); - remainingParts = new HashSet<>(d.partitions().fullSet()); + remainingParts = new HashSet<>(demandMsg.partitions().fullSet()); - CachePartitionPartialCountersMap histMap = d.partitions().historicalMap(); + CachePartitionPartialCountersMap histMap = demandMsg.partitions().historicalMap(); for (int i = 0; i < histMap.size(); i++) { int p = histMap.partitionAt(i); @@ -289,16 +275,16 @@ class GridDhtPartitionSupplier { remainingParts.add(p); } - for (Integer part : d.partitions().fullSet()) { + for (Integer part : demandMsg.partitions().fullSet()) { if (iter.isPartitionMissing(part)) continue; - GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false); + GridDhtLocalPartition loc = top.localPartition(part, demandMsg.topologyVersion(), false); assert loc != null && loc.state() == GridDhtPartitionState.OWNING : "Partition should be in OWNING state: " + loc; - s.addEstimatedKeysCount(grp.offheap().totalPartitionEntriesCount(part)); + supplyMsg.addEstimatedKeysCount(grp.offheap().totalPartitionEntriesCount(part)); } for (int i = 0; i < histMap.size(); i++) { @@ -307,7 +293,7 @@ class GridDhtPartitionSupplier { if (iter.isPartitionMissing(p)) continue; - s.addEstimatedKeysCount(histMap.updateCounterAt(i) - histMap.initialUpdateCounterAt(i)); + supplyMsg.addEstimatedKeysCount(histMap.updateCounterAt(i) - histMap.initialUpdateCounterAt(i)); } } else { @@ -316,28 +302,30 @@ class GridDhtPartitionSupplier { remainingParts = sctx.remainingParts; } - final int messageMaxSize = grp.config().getRebalanceBatchSize(); + final int msgMaxSize = grp.config().getRebalanceBatchSize(); + + long batchesCnt = 0; while (iter.hasNext()) { - if (s.messageSize() >= messageMaxSize) { + if (supplyMsg.messageSize() >= msgMaxSize) { if (++batchesCnt >= maxBatchesCnt) { saveSupplyContext(contextId, iter, remainingParts, - d.rebalanceId() + demandMsg.rebalanceId() ); - reply(node, d, s, contextId); + reply(topicId, demanderNode, demandMsg, supplyMsg, contextId); return; } else { - if (!reply(node, d, s, contextId)) + if (!reply(topicId, demanderNode, demandMsg, supplyMsg, contextId)) return; - s = new GridDhtPartitionSupplyMessage(d.rebalanceId(), + supplyMsg = new GridDhtPartitionSupplyMessage(demandMsg.rebalanceId(), grp.groupId(), - d.topologyVersion(), + demandMsg.topologyVersion(), grp.deploymentEnabled()); } } @@ -346,19 +334,19 @@ class GridDhtPartitionSupplier { int part = row.partition(); - GridDhtLocalPartition loc = top.localPartition(part, d.topologyVersion(), false); + GridDhtLocalPartition loc = top.localPartition(part, demandMsg.topologyVersion(), false); assert (loc != null && loc.state() == OWNING && loc.reservations() > 0) || iter.isPartitionMissing(part) : "Partition should be in OWNING state and has at least 1 reservation " + loc; if (iter.isPartitionMissing(part) && remainingParts.contains(part)) { - s.missed(part); + supplyMsg.missed(part); remainingParts.remove(part); if (log.isDebugEnabled()) - log.debug("Requested partition is marked as missing on local node [part=" + part + - ", demander=" + nodeId + ']'); + log.debug("Requested partition is marked as missing [" + + supplyRoutineInfo(topicId, nodeId, demandMsg) + ", p=" + part + "]"); continue; } @@ -400,7 +388,7 @@ class GridDhtPartitionSupplier { info.expireTime(row.expireTime()); if (preloadPred == null || preloadPred.apply(info)) - s.addEntry0(part, iter.historical(part), info, grp.shared(), grp.cacheObjectContext()); + supplyMsg.addEntry0(part, iter.historical(part), info, grp.shared(), grp.cacheObjectContext()); else { if (log.isTraceEnabled()) log.trace("Rebalance predicate evaluated to false (will not send " + @@ -408,7 +396,7 @@ class GridDhtPartitionSupplier { } if (iter.isPartitionDone(part)) { - s.last(part, loc.updateCounter()); + supplyMsg.last(part, loc.updateCounter()); remainingParts.remove(part); } @@ -420,17 +408,17 @@ class GridDhtPartitionSupplier { int p = remainingIter.next(); if (iter.isPartitionDone(p)) { - GridDhtLocalPartition loc = top.localPartition(p, d.topologyVersion(), false); + GridDhtLocalPartition loc = top.localPartition(p, demandMsg.topologyVersion(), false); assert loc != null : "Supply partition is gone: grp=" + grp.cacheOrGroupName() + ", p=" + p; - s.last(p, loc.updateCounter()); + supplyMsg.last(p, loc.updateCounter()); remainingIter.remove(); } else if (iter.isPartitionMissing(p)) { - s.missed(p); + supplyMsg.missed(p); remainingIter.remove(); } @@ -444,32 +432,28 @@ class GridDhtPartitionSupplier { else iter.close(); - reply(node, d, s, contextId); + reply(topicId, demanderNode, demandMsg, supplyMsg, contextId); - if (log.isDebugEnabled()) - log.debug("Finished supplying rebalancing [cache=" + grp.cacheOrGroupName() + - ", fromNode=" + node.id() + - ", topology=" + demTop + ", rebalanceId=" + d.rebalanceId() + - ", topicId=" + topicId + "]"); + if (log.isInfoEnabled()) + log.info("Finished supplying rebalancing [" + supplyRoutineInfo(topicId, nodeId, demandMsg) + "]"); } catch (Throwable t) { if (grp.shared().kernalContext().isStopping()) return; // Sending supply messages with error requires new protocol. - boolean sendErrMsg = node.version().compareTo(GridDhtPartitionSupplyMessageV2.AVAILABLE_SINCE) >= 0; + boolean sendErrMsg = demanderNode.version().compareTo(GridDhtPartitionSupplyMessageV2.AVAILABLE_SINCE) >= 0; if (t instanceof IgniteSpiException) { if (log.isDebugEnabled()) - log.debug("Failed to send message to node (current node is stopping?) [node=" + node.id() + - ", msg=" + t.getMessage() + ']'); + log.debug("Failed to send message to node (current node is stopping?) [" + + supplyRoutineInfo(topicId, nodeId, demandMsg) + ", msg=" + t.getMessage() + ']'); sendErrMsg = false; } else - U.error(log, "Failed to continue supplying process for " + - "[cache=" + grp.cacheOrGroupName() + ", node=" + nodeId - + ", topicId=" + contextId.get2() + ", topVer=" + contextId.get3() + "]", t); + U.error(log, "Failed to continue supplying [" + + supplyRoutineInfo(topicId, nodeId, demandMsg) + "]", t); try { if (sctx != null) @@ -478,9 +462,8 @@ class GridDhtPartitionSupplier { iter.close(); } catch (Throwable t1) { - U.error(log, "Failed to cleanup supplying context " + - "[cache=" + grp.cacheOrGroupName() + ", node=" + nodeId - + ", topicId=" + contextId.get2() + ", topVer=" + contextId.get3() + "]", t1); + U.error(log, "Failed to cleanup supplying context [" + + supplyRoutineInfo(topicId, nodeId, demandMsg) + "]", t1); } if (!sendErrMsg) @@ -488,19 +471,18 @@ class GridDhtPartitionSupplier { try { GridDhtPartitionSupplyMessageV2 errMsg = new GridDhtPartitionSupplyMessageV2( - d.rebalanceId(), + demandMsg.rebalanceId(), grp.groupId(), - d.topologyVersion(), + demandMsg.topologyVersion(), grp.deploymentEnabled(), t ); - reply(node, d, errMsg, contextId); + reply(topicId, demanderNode, demandMsg, errMsg, contextId); } catch (Throwable t1) { - U.error(log, "Failed to send supply error message for " + - "[cache=" + grp.cacheOrGroupName() + ", node=" + nodeId - + ", topicId=" + contextId.get2() + ", topVer=" + contextId.get3() + "]", t1); + U.error(log, "Failed to send supply error message [" + + supplyRoutineInfo(topicId, nodeId, demandMsg) + "]", t1); } } } @@ -508,23 +490,25 @@ class GridDhtPartitionSupplier { /** * Sends supply message to demand node. * - * @param node Recipient of supply message. - * @param d Demand message. - * @param s Supply message. + * @param demander Recipient of supply message. + * @param demandMsg Demand message. + * @param supplyMsg Supply message. * @param contextId Supply context id. * @return {@code True} if message was sent, {@code false} if recipient left grid. * @throws IgniteCheckedException If failed. */ - private boolean reply(ClusterNode node, - GridDhtPartitionDemandMessage d, - GridDhtPartitionSupplyMessage s, - T3<UUID, Integer, AffinityTopologyVersion> contextId) - throws IgniteCheckedException { + private boolean reply( + int topicId, + ClusterNode demander, + GridDhtPartitionDemandMessage demandMsg, + GridDhtPartitionSupplyMessage supplyMsg, + T3<UUID, Integer, AffinityTopologyVersion> contextId + ) throws IgniteCheckedException { try { if (log.isDebugEnabled()) - log.debug("Replying to partition demand [node=" + node.id() + ", demand=" + d + ", supply=" + s + ']'); + log.debug("Send next supply message [" + supplyRoutineInfo(topicId, demander.id(), demandMsg) + "]"); - grp.shared().io().sendOrderedMessage(node, d.topic(), s, grp.ioPolicy(), d.timeout()); + grp.shared().io().sendOrderedMessage(demander, demandMsg.topic(), supplyMsg, grp.ioPolicy(), demandMsg.timeout()); // Throttle preloading. if (grp.config().getRebalanceThrottle() > 0) @@ -534,7 +518,7 @@ class GridDhtPartitionSupplier { } catch (ClusterTopologyCheckedException ignore) { if (log.isDebugEnabled()) - log.debug("Failed to send partition supply message because node left grid: " + node.id()); + log.debug("Failed to send supply message (demander left): [" + supplyRoutineInfo(topicId, demander.id(), demandMsg) + "]"); synchronized (scMap) { clearContext(scMap.remove(contextId), log); @@ -545,6 +529,17 @@ class GridDhtPartitionSupplier { } /** + * String representation of supply routine. + * + * @param topicId Topic id. + * @param demander Demander. + * @param demandMsg Demand message. + */ + private String supplyRoutineInfo(int topicId, UUID demander, GridDhtPartitionDemandMessage demandMsg) { + return "grp=" + grp.cacheOrGroupName() + ", demander=" + demander + ", topVer=" + demandMsg.topologyVersion() + ", topic=" + topicId; + } + + /** * Saves supply context with given parameters to {@code scMap}. * * @param contextId Supply context id. @@ -556,7 +551,8 @@ class GridDhtPartitionSupplier { T3<UUID, Integer, AffinityTopologyVersion> contextId, IgniteRebalanceIterator entryIt, Set<Integer> remainingParts, - long rebalanceId) { + long rebalanceId + ) { synchronized (scMap) { assert scMap.get(contextId) == null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/9f546f87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index b4c582f..4e76f99 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -84,11 +84,11 @@ import org.apache.ignite.internal.processors.cache.GridCacheUtils; import org.apache.ignite.internal.processors.cache.LocalJoinCachesContext; import org.apache.ignite.internal.processors.cache.StateChangeRequest; import org.apache.ignite.internal.processors.cache.WalStateAbstractMessage; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsStateValidator; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridClientPartitionTopology; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionsStateValidator; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.Latch; import org.apache.ignite.internal.processors.cache.mvcc.MvccCoordinator; @@ -1399,8 +1399,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (exchId.isLeft()) cctx.mvcc().removeExplicitNodeLocks(exchId.nodeId(), exchId.topologyVersion()); - if (log.isDebugEnabled()) - log.debug("Before waiting for partition release future: " + this); + if (log.isTraceEnabled()) + log.trace("Before waiting for partition release future: " + this); int dumpCnt = 0; @@ -1611,6 +1611,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte private void sendLocalPartitions(ClusterNode node) throws IgniteCheckedException { assert node != null; + long time = System.currentTimeMillis(); + GridDhtPartitionsSingleMessage msg; // Reset lost partitions before sending local partitions to coordinator. @@ -1651,8 +1653,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte else if (localJoinExchange()) msg.cacheGroupsAffinityRequest(exchCtx.groupsAffinityRequestOnJoin()); - if (log.isDebugEnabled()) - log.debug("Sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + ", msg=" + msg + ']'); + if (log.isTraceEnabled()) + log.trace("Sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + ", msg=" + msg + ']'); try { cctx.io().send(node, msg, SYSTEM_POOL); @@ -1661,6 +1663,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (log.isDebugEnabled()) log.debug("Node left during partition exchange [nodeId=" + node.id() + ", exchId=" + exchId + ']'); } + + if (log.isInfoEnabled()) + log.info("Sending Single Message performed in " + (System.currentTimeMillis() - time) + " ms."); } /** @@ -1700,8 +1705,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte ) { assert !nodes.contains(cctx.localNode()); - if (log.isDebugEnabled()) { - log.debug("Sending full partition map [nodeIds=" + F.viewReadOnly(nodes, F.node2id()) + + if (log.isTraceEnabled()) { + log.trace("Sending full partition map [nodeIds=" + F.viewReadOnly(nodes, F.node2id()) + ", exchId=" + exchId + ", msg=" + fullMsg + ']'); } @@ -1719,6 +1724,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte .map(singleMessage -> fullMsg.copy().joinedNodeAffinity(affinityForJoinedNodes)) .orElse(null); + long time = System.currentTimeMillis(); + // Prepare and send full messages for given nodes. nodes.stream() .map(node -> { @@ -1771,6 +1778,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte U.error(log, "Failed to send partitions [node=" + node + ']', e); } }); + + if (log.isInfoEnabled()) + log.info("Sending Full Message performed in " + (System.currentTimeMillis() - time) + " ms."); } /** @@ -2768,6 +2778,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte private void detectLostPartitions(AffinityTopologyVersion resTopVer) { boolean detected = false; + long time = System.currentTimeMillis(); + synchronized (cctx.exchange().interruptLock()) { if (Thread.currentThread().isInterrupted()) return; @@ -2781,8 +2793,16 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } - if (detected) + if (detected) { + if (log.isDebugEnabled()) + log.debug("Partitions have been scheduled to resend [reason=" + + "Lost partitions detect on " + resTopVer + "]"); + cctx.exchange().scheduleResendPartitions(); + } + + if (log.isInfoEnabled()) + log.info("Detecting lost partitions performed in " + (System.currentTimeMillis() - time) + " ms."); } /** @@ -2901,8 +2921,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (log.isInfoEnabled()) log.info("Coordinator received all messages, try merge [ver=" + initialVersion() + ']'); + long time = System.currentTimeMillis(); + boolean finish = cctx.exchange().mergeExchangesOnCoordinator(this); + if (log.isInfoEnabled()) + log.info("Exchanges merging performed in " + (System.currentTimeMillis() - time) + " ms."); + if (!finish) return; } @@ -2937,6 +2962,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte Map<Integer, CacheGroupAffinityMessage> idealAffDiff = null; + long time = System.currentTimeMillis(); + if (exchCtx.mergeExchanges()) { synchronized (mux) { if (mergedJoinExchMsgs != null) { @@ -2970,6 +2997,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } + if (log.isInfoEnabled()) + log.info("Affinity changes (coordinator) applied in " + (System.currentTimeMillis() - time) + " ms."); + Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = null; for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) { @@ -3051,6 +3081,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte IgniteProductVersion minVer = exchCtx.events().discoveryCache().minimumNodeVersion(); + time = System.currentTimeMillis(); + GridDhtPartitionsFullMessage msg = createPartitionsMessage(true, minVer.compareToIgnoreTimestamp(PARTIAL_COUNTERS_MAP_SINCE) >= 0); @@ -3067,6 +3099,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte msg.prepareMarshal(cctx); + if (log.isInfoEnabled()) + log.info("Preparing Full Message performed in " + (System.currentTimeMillis() - time) + " ms."); + synchronized (mux) { finishState = new FinishState(crd.id(), resTopVer, msg); @@ -3076,6 +3111,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (centralizedAff) { assert !exchCtx.mergeExchanges(); + time = System.currentTimeMillis(); + IgniteInternalFuture<Map<Integer, Map<Integer, List<UUID>>>> fut = cctx.affinity().initAffinityOnNodeLeft(this); if (!fut.isDone()) { @@ -3087,6 +3124,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } else onAffinityInitialized(fut); + + if (log.isInfoEnabled()) + log.info("Centralized affinity changes are performed in " + (System.currentTimeMillis() - time) + " ms."); } else { Set<ClusterNode> nodes; @@ -3115,11 +3155,16 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte nodes.addAll(sndResNodes); } + time = System.currentTimeMillis(); + if (!nodes.isEmpty()) sendAllPartitions(msg, nodes, mergedJoinExchMsgs0, joinedNodeAff); partitionsSent = true; + if (log.isInfoEnabled()) + log.info("Sending Full Message to all nodes performed in " + (System.currentTimeMillis() - time) + " ms."); + if (!stateChangeExchange()) onDone(exchCtx.events().topologyVersion(), null); @@ -3194,6 +3239,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * Validates that partition update counters and cache sizes for all caches are consistent. */ private void validatePartitionsState() { + long time = System.currentTimeMillis(); + for (Map.Entry<Integer, CacheGroupDescriptor> e : cctx.affinity().cacheGroups().entrySet()) { CacheGroupDescriptor grpDesc = e.getValue(); if (grpDesc.config().getCacheMode() == CacheMode.LOCAL) @@ -3227,12 +3274,17 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte // TODO: Handle such errors https://issues.apache.org/jira/browse/IGNITE-7833 } } + + if (log.isInfoEnabled()) + log.info("Partitions validation performed in " + (System.currentTimeMillis() - time) + " ms."); } /** * */ private void assignPartitionsStates() { + long time = System.currentTimeMillis(); + for (Map.Entry<Integer, CacheGroupDescriptor> e : cctx.affinity().cacheGroups().entrySet()) { CacheGroupDescriptor grpDesc = e.getValue(); if (grpDesc.config().getCacheMode() == CacheMode.LOCAL) @@ -3249,6 +3301,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte else assignPartitionStates(top); } + + if (log.isInfoEnabled()) + log.info("Partitions assignment performed in " + (System.currentTimeMillis() - time) + " ms."); } /** @@ -3289,8 +3344,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte try { cctx.io().send(node, fullMsg, SYSTEM_POOL); - if (log.isDebugEnabled()) { - log.debug("Full message was sent to node: " + + if (log.isTraceEnabled()) { + log.trace("Full message was sent to node: " + node + ", fullMsg: " + fullMsg ); @@ -3566,6 +3621,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte AffinityTopologyVersion resTopVer = initialVersion(); + long time = System.currentTimeMillis(); + if (exchCtx.mergeExchanges()) { if (msg.resultTopologyVersion() != null && !initialVersion().equals(msg.resultTopologyVersion())) { if (log.isInfoEnabled()) { @@ -3609,6 +3666,9 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte else if (forceAffReassignment) cctx.affinity().applyAffinityFromFullMessage(this, msg); + if (log.isInfoEnabled()) + log.info("Affinity changes applied in " + (System.currentTimeMillis() - time) + " ms."); + if (dynamicCacheStartExchange() && !F.isEmpty(exchangeGlobalExceptions)) { assert cctx.localNode().isClient(); @@ -3654,6 +3714,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte partHistSuppliers.putAll(msg.partitionHistorySuppliers()); + long time = System.currentTimeMillis(); + for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) { Integer grpId = entry.getKey(); @@ -3690,6 +3752,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } partitionsReceived = true; + + if (log.isInfoEnabled()) + log.info("Full map updating for " + msg.partitions().size() + + " groups performed in " + (System.currentTimeMillis() - time) + " ms."); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/9f546f87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index 888ab6a..b84dc79 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -29,7 +29,7 @@ import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; http://git-wip-us.apache.org/repos/asf/ignite/blob/9f546f87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index 7998e07..088fb31 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@ -28,7 +28,7 @@ import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.tostring.GridToStringInclude; http://git-wip-us.apache.org/repos/asf/ignite/blob/9f546f87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index f886767..034bf72 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -19,7 +19,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.UUID; import java.util.concurrent.locks.ReadWriteLock; @@ -37,26 +36,24 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; import org.apache.ignite.internal.processors.cache.GridCachePreloaderAdapter; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.typedef.CI1; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.lang.IgnitePredicate; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED; -import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED; -import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.LOST; -import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING; -import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; -import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.RENTING; +import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.EVICTED; +import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST; +import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING; +import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING; +import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.RENTING; /** * DHT cache preloader. @@ -231,7 +228,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { if (!grp.rebalanceEnabled()) return new GridDhtPreloaderAssignments(exchId, top.readyTopologyVersion()); - int partCnt = grp.affinity().partitions(); + int partitions = grp.affinity().partitions(); AffinityTopologyVersion topVer = top.readyTopologyVersion(); @@ -246,7 +243,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { CachePartitionFullCountersMap countersMap = grp.topology().fullUpdateCounters(); - for (int p = 0; p < partCnt; p++) { + for (int p = 0; p < partitions; p++) { if (ctx.exchange().hasPendingExchange()) { if (log.isDebugEnabled()) log.debug("Skipping assignments creation, exchange worker has pending assignments: " + @@ -310,7 +307,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { ); } - msg.partitions().addHistorical(p, part.initialUpdateCounter(), countersMap.updateCounter(p), partCnt); + msg.partitions().addHistorical(p, part.initialUpdateCounter(), countersMap.updateCounter(p), partitions); } else { List<ClusterNode> picked = remoteOwners(p, topVer); @@ -478,8 +475,13 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_UNLOADED)) grp.addUnloadEvent(part.id()); - if (updateSeq) + if (updateSeq) { + if (log.isDebugEnabled()) + log.debug("Partitions have been scheduled to resend [reason=" + + "Eviction [grp" + grp.cacheOrGroupName() + " " + part.id() + "]"); + ctx.exchange().scheduleResendPartitions(); + } } finally { leaveBusy(); http://git-wip-us.apache.org/repos/asf/ignite/blob/9f546f87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java index 6e847bb..6f98889 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java @@ -20,7 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; import java.util.concurrent.ConcurrentHashMap; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopologyImpl; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopologyImpl; import org.apache.ignite.internal.util.typedef.internal.S; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/9f546f87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtDemandedPartitionsMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtDemandedPartitionsMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtDemandedPartitionsMap.java index d829b53..9fe3c64 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtDemandedPartitionsMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/IgniteDhtDemandedPartitionsMap.java @@ -19,11 +19,8 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; import java.io.Serializable; -import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; -import java.util.Iterator; -import java.util.List; import java.util.Set; import org.apache.ignite.internal.util.typedef.internal.S; import org.jetbrains.annotations.Nullable; @@ -153,66 +150,24 @@ public class IgniteDhtDemandedPartitionsMap implements Serializable { return Collections.unmodifiableSet(full); } - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IgniteDhtDemandedPartitionsMap.class, this); - } - - /** - * @return String representation of partitions list. - */ - String partitionsList() { - List<Integer> s = new ArrayList<>(size()); + /** */ + public Set<Integer> historicalSet() { + if (historical == null) + return Collections.emptySet(); - s.addAll(fullSet()); + Set<Integer> historical = new HashSet<>(historicalMap().size()); for (int i = 0; i < historicalMap().size(); i++) { int p = historicalMap().partitionAt(i); - assert !s.contains(p); - - s.add(p); + historical.add(p); } - Collections.sort(s); - - StringBuilder sb = new StringBuilder(); - - int start = -1; - - int prev = -1; - - Iterator<Integer> sit = s.iterator(); - - while (sit.hasNext()) { - int p = sit.next(); - - if (start == -1) { - start = p; - prev = p; - } - - if (prev < p - 1) { - sb.append(start); - - if (start != prev) - sb.append("-").append(prev); - - sb.append(", "); - - start = p; - } - - if (!sit.hasNext()) { - sb.append(start); - - if (start != p) - sb.append("-").append(p); - } - - prev = p; - } + return historical; + } - return sb.toString(); + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(IgniteDhtDemandedPartitionsMap.class, this); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/9f546f87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/EvictionContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/EvictionContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/EvictionContext.java new file mode 100644 index 0000000..1498aa6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/topology/EvictionContext.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.topology; + +/** + * Additional context for partition eviction process. + */ +public interface EvictionContext { + /** + * @return {@code true} If eviction process should be stopped. + */ + public boolean shouldStop(); +}