Repository: ignite Updated Branches: refs/heads/master aec3f91c2 -> ce73c9d85
IGNITE-9868 Improved background full message sending - Fixes #4975. Signed-off-by: Alexey Goncharuk <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ce73c9d8 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ce73c9d8 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ce73c9d8 Branch: refs/heads/master Commit: ce73c9d85ffd5b50e4c3370b13302605392fa572 Parents: aec3f91 Author: Sergey Antonov <[email protected]> Authored: Wed Oct 17 20:08:34 2018 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Wed Oct 17 20:18:36 2018 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 177 ++++++++++++++----- .../GridDhtPartitionsExchangeFuture.java | 12 +- 2 files changed, 135 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ce73c9d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 0baf5a3..6af9678 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1028,11 +1028,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** - * Partition refresh callback. + * Partition refresh callback for selected cache groups. * For coordinator causes {@link GridDhtPartitionsFullMessage FullMessages} send, * for non coordinator - {@link GridDhtPartitionsSingleMessage SingleMessages} send + * + * @param grps Cache groups for partitions refresh. */ - public void refreshPartitions() { + public void refreshPartitions(@NotNull Collection<CacheGroupContext> grps) { // TODO https://issues.apache.org/jira/browse/IGNITE-6857 if (cctx.snapshot().snapshotOperationInProgress()) { scheduleResendPartitions(); @@ -1040,6 +1042,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana return; } + if (grps.isEmpty()) { + if (log.isDebugEnabled()) + log.debug("Skip partitions refresh, there are no cache groups for partition refresh."); + + return; + } + ClusterNode oldest = cctx.discovery().oldestAliveServerNode(AffinityTopologyVersion.NONE); if (oldest == null) { @@ -1049,8 +1058,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana return; } - if (log.isDebugEnabled()) - log.debug("Refreshing partitions [oldest=" + oldest.id() + ", loc=" + cctx.localNodeId() + ']'); + if (log.isDebugEnabled()) { + log.debug("Refreshing partitions [oldest=" + oldest.id() + ", loc=" + cctx.localNodeId() + + ", cacheGroups= " + grps + ']'); + } // If this is the oldest node. if (oldest.id().equals(cctx.localNodeId())) { @@ -1068,50 +1079,66 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana // No need to send to nodes which did not finish their first exchange. AffinityTopologyVersion rmtTopVer = - lastFut != null ? (lastFut.isDone() ? lastFut.topologyVersion() : lastFut.initialVersion()) : AffinityTopologyVersion.NONE; + lastFut != null ? + (lastFut.isDone() ? lastFut.topologyVersion() : lastFut.initialVersion()) + : AffinityTopologyVersion.NONE; Collection<ClusterNode> rmts = cctx.discovery().remoteAliveNodesWithCaches(rmtTopVer); if (log.isDebugEnabled()) log.debug("Refreshing partitions from oldest node: " + cctx.localNodeId()); - sendAllPartitions(rmts, rmtTopVer); + sendAllPartitions(rmts, rmtTopVer, grps); } else { if (log.isDebugEnabled()) log.debug("Refreshing local partitions from non-oldest node: " + cctx.localNodeId()); - sendLocalPartitions(oldest, null); + sendLocalPartitions(oldest, null, grps); } } /** + * Partition refresh callback. + * For coordinator causes {@link GridDhtPartitionsFullMessage FullMessages} send, + * for non coordinator - {@link GridDhtPartitionsSingleMessage SingleMessages} send + */ + public void refreshPartitions() { refreshPartitions(cctx.cache().cacheGroups()); } + + /** * @param nodes Nodes. * @param msgTopVer Topology version. Will be added to full message. + * @param grps Selected cache groups. */ private void sendAllPartitions( Collection<ClusterNode> nodes, - AffinityTopologyVersion msgTopVer + AffinityTopologyVersion msgTopVer, + Collection<CacheGroupContext> grps ) { long time = System.currentTimeMillis(); - GridDhtPartitionsFullMessage m = createPartitionsFullMessage(true, false, null, null, null, null); + GridDhtPartitionsFullMessage m = createPartitionsFullMessage(true, false, null, null, null, null, grps); m.topologyVersion(msgTopVer); if (log.isInfoEnabled()) { long latency = System.currentTimeMillis() - time; - if (latency > 100 || log.isDebugEnabled()) - log.info("Full Message creating for " + msgTopVer + " performed in " + latency + " ms."); + if (latency > 50 || log.isDebugEnabled()) { + log.info("Finished full message creation [msgTopVer=" + msgTopVer + ", groups=" + grps + + ", latency=" + latency + "ms]"); + } } if (log.isTraceEnabled()) - log.trace("Sending all partitions [nodeIds=" + U.nodeIds(nodes) + ", msg=" + m + ']'); + log.trace("Sending all partitions [nodeIds=" + U.nodeIds(nodes) + ", cacheGroups=" + grps + + ", msg=" + m + ']'); time = System.currentTimeMillis(); + Collection<ClusterNode> failedNodes = U.newHashSet(nodes.size()); + for (ClusterNode node : nodes) { try { assert !node.equals(cctx.localNode()); @@ -1119,22 +1146,34 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana cctx.io().sendNoRetry(node, m, SYSTEM_POOL); } catch (ClusterTopologyCheckedException ignore) { - if (log.isDebugEnabled()) - log.debug("Failed to send partition update to node because it left grid (will ignore) [node=" + - node.id() + ", msg=" + m + ']'); + if (log.isDebugEnabled()) { + log.debug("Failed to send partition update to node because it left grid (will ignore) " + + "[node=" + node.id() + ", msg=" + m + ']'); + } } catch (IgniteCheckedException e) { - U.warn(log, "Failed to send partitions full message [node=" + node + ", err=" + e + ']'); + failedNodes.add(node); + + U.warn(log, "Failed to send partitions full message [node=" + node + ", err=" + e + ']', e); } } - if (log.isInfoEnabled()) - log.info("Sending Full Message for " + msgTopVer + " performed in " + (System.currentTimeMillis() - time) + " ms."); + if (log.isInfoEnabled()) { + long latency = System.currentTimeMillis() - time; + + if (latency > 50 || log.isDebugEnabled()) { + log.info("Finished sending full message [msgTopVer=" + msgTopVer + ", groups=" + grps + + (failedNodes.isEmpty() ? "" : (", skipped=" + U.nodeIds(failedNodes))) + + ", latency=" + latency + "ms]"); + } + } } /** + * Creates partitions full message for all cache groups. + * * @param compress {@code True} if possible to compress message (properly work only if prepareMarshall/ - * finishUnmarshall methods are called). + * finishUnmarshall methods are called). * @param newCntrMap {@code True} if possible to use {@link CachePartitionFullCountersMap}. * @param exchId Non-null exchange ID if message is created for exchange. * @param lastVer Last version. @@ -1150,18 +1189,43 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana @Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers, @Nullable IgniteDhtPartitionsToReloadMap partsToReload ) { - final GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(exchId, - lastVer, - exchId != null ? exchId.topologyVersion() : AffinityTopologyVersion.NONE, - partHistSuppliers, - partsToReload - ); + Collection<CacheGroupContext> grps = cctx.cache().cacheGroups(); + + return createPartitionsFullMessage(compress, newCntrMap, exchId, lastVer, partHistSuppliers, partsToReload, grps); + } + + /** + * Creates partitions full message for selected cache groups. + * + * @param compress {@code True} if possible to compress message (properly work only if prepareMarshall/ + * finishUnmarshall methods are called). + * @param newCntrMap {@code True} if possible to use {@link CachePartitionFullCountersMap}. + * @param exchId Non-null exchange ID if message is created for exchange. + * @param lastVer Last version. + * @param partHistSuppliers Partition history suppliers map. + * @param partsToReload Partitions to reload map. + * @param grps Selected cache groups. + * @return Message. + */ + public GridDhtPartitionsFullMessage createPartitionsFullMessage( + boolean compress, + boolean newCntrMap, + @Nullable final GridDhtPartitionExchangeId exchId, + @Nullable GridCacheVersion lastVer, + @Nullable IgniteDhtPartitionHistorySuppliersMap partHistSuppliers, + @Nullable IgniteDhtPartitionsToReloadMap partsToReload, + Collection<CacheGroupContext> grps + ) { + AffinityTopologyVersion ver = exchId != null ? exchId.topologyVersion() : AffinityTopologyVersion.NONE; + + final GridDhtPartitionsFullMessage m = + new GridDhtPartitionsFullMessage(exchId, lastVer, ver, partHistSuppliers, partsToReload); m.compress(compress); final Map<Object, T2<Integer, GridDhtPartitionFullMap>> dupData = new HashMap<>(); - for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + for (CacheGroupContext grp : grps) { if (!grp.isLocal()) { if (exchId != null) { AffinityTopologyVersion startTopVer = grp.localStartVersion(); @@ -1174,14 +1238,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana GridDhtPartitionFullMap locMap = grp.topology().partitionMap(true); - if (locMap != null) { - addFullPartitionsMap(m, - dupData, - compress, - grp.groupId(), - locMap, - affCache.similarAffinityKey()); - } + if (locMap != null) + addFullPartitionsMap(m, dupData, compress, grp.groupId(), locMap, affCache.similarAffinityKey()); m.addPartitionSizes(grp.groupId(), grp.topology().globalPartSizes()); @@ -1202,14 +1260,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) { GridDhtPartitionFullMap map = top.partitionMap(true); - if (map != null) { - addFullPartitionsMap(m, - dupData, - compress, - top.groupId(), - map, - top.similarAffinityKey()); - } + if (map != null) + addFullPartitionsMap(m, dupData, compress, top.groupId(), map, top.similarAffinityKey()); if (exchId != null) { CachePartitionFullCountersMap cntrsMap = top.fullUpdateCounters(); @@ -1269,13 +1321,15 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** * @param node Destination cluster node. * @param id Exchange ID. + * @param grps Cache groups for send partitions. */ - private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) { - GridDhtPartitionsSingleMessage m = createPartitionsSingleMessage(id, - cctx.kernalContext().clientNode(), - false, - false, - null); + private void sendLocalPartitions( + ClusterNode node, + @Nullable GridDhtPartitionExchangeId id, + @NotNull Collection<CacheGroupContext> grps + ) { + GridDhtPartitionsSingleMessage m = + createPartitionsSingleMessage(id, cctx.kernalContext().clientNode(), false, false, null, grps); if (log.isTraceEnabled()) log.trace("Sending local partitions [nodeId=" + node.id() + ", msg=" + m + ']'); @@ -1294,6 +1348,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** + * Creates partitions single message for all cache groups. + * * @param exchangeId Exchange ID. * @param clientOnlyExchange Client exchange flag. * @param sndCounters {@code True} if need send partition update counters. @@ -1307,6 +1363,29 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana boolean newCntrMap, ExchangeActions exchActions ) { + Collection<CacheGroupContext> grps = cctx.cache().cacheGroups(); + + return createPartitionsSingleMessage(exchangeId, clientOnlyExchange, sndCounters, newCntrMap, exchActions, grps); + } + + /** + * Creates partitions single message for selected cache groups. + * + * @param exchangeId Exchange ID. + * @param clientOnlyExchange Client exchange flag. + * @param sndCounters {@code True} if need send partition update counters. + * @param newCntrMap {@code True} if possible to use {@link CachePartitionPartialCountersMap}. + * @param grps Selected cache groups. + * @return Message. + */ + public GridDhtPartitionsSingleMessage createPartitionsSingleMessage( + @Nullable GridDhtPartitionExchangeId exchangeId, + boolean clientOnlyExchange, + boolean sndCounters, + boolean newCntrMap, + ExchangeActions exchActions, + Collection<CacheGroupContext> grps + ) { GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(exchangeId, clientOnlyExchange, cctx.versions().last(), @@ -1314,7 +1393,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana Map<Object, T2<Integer, GridPartitionStateMap>> dupData = new HashMap<>(); - for (CacheGroupContext grp : cctx.cache().cacheGroups()) { + for (CacheGroupContext grp : grps) { if (!grp.isLocal() && (exchActions == null || !exchActions.cacheGroupStopping(grp.groupId()))) { GridDhtPartitionMap locMap = grp.topology().localPartitionMap(); http://git-wip-us.apache.org/repos/asf/ignite/blob/ce73c9d8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index e550a8b..0fe1a25 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -2080,22 +2080,24 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (centralizedAff || forceAffReassignment) { assert !exchCtx.mergeExchanges(); + Collection<CacheGroupContext> grpToRefresh = U.newHashSet(cctx.cache().cacheGroups().size()); + for (CacheGroupContext grp : cctx.cache().cacheGroups()) { if (grp.isLocal()) continue; - boolean needRefresh = false; - try { - needRefresh = grp.topology().initPartitionsWhenAffinityReady(res, this); + if (grp.topology().initPartitionsWhenAffinityReady(res, this)) + grpToRefresh.add(grp); } catch (IgniteInterruptedCheckedException e) { U.error(log, "Failed to initialize partitions.", e); } - if (needRefresh) - cctx.exchange().refreshPartitions(); } + + if (!grpToRefresh.isEmpty()) + cctx.exchange().refreshPartitions(grpToRefresh); } for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
