ignite-5075
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f974c53b Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f974c53b Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f974c53b Branch: refs/heads/ignite-5075 Commit: f974c53b9e73881037eea7213102685493c27bc1 Parents: cd27689 Author: sboikov <sboi...@gridgain.com> Authored: Thu May 18 01:23:41 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu May 18 01:23:41 2017 +0300 ---------------------------------------------------------------------- .../cache/CacheAffinitySharedManager.java | 69 ++++++++++++-------- .../processors/cache/CacheGroupDescriptor.java | 4 +- .../cache/CacheGroupInfrastructure.java | 10 --- .../processors/cache/ClusterCachesInfo.java | 5 +- .../processors/cache/GridCacheAttributes.java | 4 ++ .../processors/cache/GridCacheIoManager.java | 17 +++-- .../GridCachePartitionExchangeManager.java | 15 +++-- .../distributed/dht/GridDhtLocalPartition.java | 2 +- .../dht/GridDhtPartitionTopology.java | 2 - .../dht/GridDhtPartitionTopologyImpl.java | 24 +++---- .../preloader/GridDhtPartitionsFullMessage.java | 2 + 11 files changed, 89 insertions(+), 65 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f974c53b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 7db6dc2..c2dbf61 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -234,9 +234,9 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap boolean rebalanced = true; if (partWait != null) { - CacheGroupHolder cache = grpHolders.get(checkGrpId); + CacheGroupHolder grpHolder = grpHolders.get(checkGrpId); - if (cache != null) { + if (grpHolder != null) { for (Iterator<Map.Entry<Integer, UUID>> it = partWait.entrySet().iterator(); it.hasNext(); ) { Map.Entry<Integer, UUID> e = it.next(); @@ -325,7 +325,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap for (CacheGroupDescriptor stopDesc : exchActions.cacheGroupsToStop()) { CacheGroupDescriptor rmvd = registeredGrps.remove(stopDesc.groupId()); - assert rmvd != null : stopDesc.groupName(); + assert rmvd != null : stopDesc.cacheOrGroupName(); } for (CacheGroupDescriptor startDesc : exchActions.cacheGroupsToStart()) { @@ -392,14 +392,24 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } } - for (CacheGroupInfrastructure grp : cctx.cache().cacheGroups()) { - if (grp.affinity().lastVersion().equals(AffinityTopologyVersion.NONE)) - initAffinity(registeredGrps.get(grp.groupId()), grp.affinity(), fut, false); - } + Set<Integer> gprs = new HashSet<>(); - if (crd) { - for (ExchangeActions.ActionData action : exchActions.newAndClientCachesStartRequests()) - initStartedGroupOnCoordinator(fut, action.descriptor().groupDescriptor()); + for (ExchangeActions.ActionData action : exchActions.newAndClientCachesStartRequests()) { + Integer grpId = action.descriptor().groupDescriptor().groupId(); + + if (gprs.add(grpId)) { + if (crd && lateAffAssign) + initStartedGroupOnCoordinator(fut, action.descriptor().groupDescriptor()); + else { + CacheGroupInfrastructure grp = cctx.cache().cacheGroup(grpId); + + if (grp != null && grp.localStartVersion().equals(fut.topologyVersion())) { + assert grp.affinity().lastVersion().equals(AffinityTopologyVersion.NONE) : grp.affinity().lastVersion(); + + initAffinity(registeredGrps.get(grp.groupId()), grp.affinity(), fut); + } + } + } } List<ExchangeActions.ActionData> closeReqs = exchActions.closeRequests(cctx.localNodeId()); @@ -801,7 +811,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap grpHolder.affinity().initialize(fut.topologyVersion(), newAff); } else if (grpHolder.client() && grp != null) { - assert grp.affinity().idealAssignment() != null; + assert grpHolder.affinity().idealAssignment() != null; grpHolder = new CacheGroupHolder1(grp, grpHolder.affinity()); @@ -845,7 +855,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap forAllCacheGroups(false, new IgniteInClosureX<GridAffinityAssignmentCache>() { @Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException { if (aff.lastVersion().equals(AffinityTopologyVersion.NONE)) - initAffinity(registeredGrps.get(aff.groupId()), aff, fut, false); + initAffinity(registeredGrps.get(aff.groupId()), aff, fut); } }); } @@ -855,17 +865,15 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap * @param desc Cache group descriptor. * @param aff Affinity. * @param fut Exchange future. - * @param fetch Force fetch flag. * @throws IgniteCheckedException If failed. */ private void initAffinity(CacheGroupDescriptor desc, GridAffinityAssignmentCache aff, - GridDhtPartitionsExchangeFuture fut, - boolean fetch) + GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException { assert desc != null : aff.cacheOrGroupName(); - if (!fetch && canCalculateAffinity(desc, aff, fut)) { + if (canCalculateAffinity(desc, aff, fut)) { List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), fut.discoCache()); aff.initialize(fut.topologyVersion(), assignment); @@ -894,16 +902,15 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap assert desc != null : aff.cacheOrGroupName(); // Do not request affinity from remote nodes if affinity function is not centralized. - if (!aff.centralizedAffinityFunction()) + if (!lateAffAssign && !aff.centralizedAffinityFunction()) return true; // If local node did not initiate exchange or local node is the only cache node in grid. - Collection<ClusterNode> affNodes = - cctx.discovery().cacheGroupAffinityNodes(aff.groupId(), fut.topologyVersion()); + Collection<ClusterNode> affNodes = fut.discoCache().cacheGroupAffinityNodes(aff.groupId()); return fut.cacheGroupAddedOnExchange(aff.groupId(), desc.receivedFrom()) || !fut.exchangeId().nodeId().equals(cctx.localNodeId()) || - (affNodes.size() == 1 && affNodes.contains(cctx.localNode())); + (affNodes.isEmpty() || (affNodes.size() == 1 && affNodes.contains(cctx.localNode()))); } /** @@ -970,7 +977,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap StringBuilder names = new StringBuilder(); for (Integer grpId : grpIds) { - String name = registeredGrps.get(grpId).groupName(); + String name = registeredGrps.get(grpId).cacheOrGroupName(); if (names.length() != 0) names.append(", "); @@ -1004,7 +1011,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap else { CacheGroupDescriptor grpDesc = registeredGrps.get(grp.groupId()); - assert grpDesc != null : grp.nameForLog(); + assert grpDesc != null : grp.cacheOrGroupName(); GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx, grpDesc, @@ -1116,7 +1123,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (grp.isLocal()) continue; - initAffinity(registeredGrps.get(grp.groupId()), grp.affinity(), fut, false); + initAffinity(registeredGrps.get(grp.groupId()), grp.affinity(), fut); } } @@ -1127,6 +1134,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap */ private IgniteInternalFuture<?> initCoordinatorCaches(final GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException { + assert lateAffAssign; + final List<IgniteInternalFuture<AffinityTopologyVersion>> futs = new ArrayList<>(); forAllRegisteredCacheGroups(new IgniteInClosureX<CacheGroupDescriptor>() { @@ -1169,7 +1178,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (log.isDebugEnabled()) { log.debug("Need initialize affinity on coordinator [" + - "cacheGrp=" + desc.groupName() + + "cacheGrp=" + desc.cacheOrGroupName() + "prevAff=" + prev.topologyVersion() + ']'); } @@ -1266,6 +1275,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap */ @Nullable private WaitRebalanceInfo initAffinityOnNodeJoin(final GridDhtPartitionsExchangeFuture fut, boolean crd) throws IgniteCheckedException { + assert lateAffAssign; + AffinityTopologyVersion topVer = fut.topologyVersion(); final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>(); @@ -1320,7 +1331,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap AffinityTopologyVersion affTopVer = aff.lastVersion(); - assert affTopVer.topologyVersion() > 0 : "Affinity is not initialized [grp=" + aff.groupId() + + assert affTopVer.topologyVersion() > 0 : "Affinity is not initialized [grp=" + aff.cacheOrGroupName() + ", topVer=" + affTopVer + ", node=" + cctx.localNodeId() + ']'; List<List<ClusterNode>> curAff = aff.assignments(affTopVer); @@ -1454,6 +1465,8 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap */ private Map<Integer, Map<Integer, List<UUID>>> initAffinityOnNodeLeft0(final GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException { + assert lateAffAssign; + final AffinityTopologyVersion topVer = fut.topologyVersion(); final WaitRebalanceInfo waitRebalanceInfo = new WaitRebalanceInfo(topVer); @@ -1472,7 +1485,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap AffinityTopologyVersion affTopVer = grpHolder.affinity().lastVersion(); assert affTopVer.topologyVersion() > 0 && !affTopVer.equals(topVer) : "Invalid affinity version " + - "[last=" + affTopVer + ", futVer=" + topVer + ", grp=" + desc.groupName() + ']'; + "[last=" + affTopVer + ", futVer=" + topVer + ", grp=" + desc.cacheOrGroupName() + ']'; List<List<ClusterNode>> curAssignment = grpHolder.affinity().assignments(affTopVer); List<List<ClusterNode>> newAssignment = grpHolder.affinity().idealAssignment(); @@ -1493,7 +1506,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap List<ClusterNode> newNodes0 = null; assert newPrimary == null || aliveNodes.contains(newPrimary) : "Invalid new primary [" + - "grp=" + desc.groupName() + + "grp=" + desc.cacheOrGroupName() + ", node=" + newPrimary + ", topVer=" + topVer + ']'; @@ -1568,7 +1581,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap if (log.isDebugEnabled()) { log.debug("Computed new affinity after node left [topVer=" + topVer + - ", waitCaches=" + (info != null ? groupNames(info.waitGrps.keySet()) : null) + ']'); + ", waitGrps=" + (info != null ? groupNames(info.waitGrps.keySet()) : null) + ']'); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/f974c53b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java index af92f91..eb82992 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupDescriptor.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.UUID; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; @@ -41,6 +42,7 @@ public class CacheGroupDescriptor { private final IgniteUuid deploymentId; /** */ + @GridToStringExclude private final CacheConfiguration cacheCfg; /** */ @@ -166,6 +168,6 @@ public class CacheGroupDescriptor { } @Override public String toString() { - return S.toString(CacheGroupDescriptor.class, this); + return S.toString(CacheGroupDescriptor.class, this, "cacheName", cacheCfg.getName()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/f974c53b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java index 800fdb2..e112bbc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupInfrastructure.java @@ -492,16 +492,6 @@ public class CacheGroupInfrastructure { } /** - * @return Group name. - */ - public String nameForLog() { - if (ccfg.getGroupName() == null) - return "[cache, name=" + ccfg.getName() + ']'; - - return "[cacheGroup, name=" + ccfg.getGroupName() + ']'; - } - - /** * @return Group name if it is specified, otherwise cache name. */ public String cacheOrGroupName() { http://git-wip-us.apache.org/repos/asf/ignite/blob/f974c53b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index 4938382..b5dbc95 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -498,7 +498,7 @@ class ClusterCachesInfo { for (CacheGroupInfrastructure grp : ctx.cache().cacheGroups()) { CacheGroupDescriptor desc = grps.get(grp.groupId()); - assert desc != null : grp.nameForLog(); + assert desc != null : grp.cacheOrGroupName(); cacheGrpsInfo.put(grp.groupId(), new CacheClientReconnectDiscoveryData.CacheGroupInfo(desc.config(), desc.deploymentId(), @@ -1068,6 +1068,9 @@ class ClusterCachesInfo { CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "rebalanceOrder", "Rebalance order", cfg.getRebalanceOrder(), startCfg.getRebalanceOrder(), false); + + CU.validateCacheGroupsAttributesMismatch(log, cfg, startCfg, "partitionLossPolicy", "Partition Loss Policy", + cfg.getPartitionLossPolicy(), startCfg.getPartitionLossPolicy(), true); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/f974c53b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java index 96d5a0a..ca640f2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAttributes.java @@ -279,6 +279,10 @@ public class GridCacheAttributes implements Serializable { return className(ccfg.getNodeFilter()); } + String topologyValidatorClassName() { + return className(ccfg.getTopologyValidator()); + } + /** * @param obj Object to get class of. * @return Class name or {@code null}. http://git-wip-us.apache.org/repos/asf/ignite/blob/f974c53b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index c4462c9..f63f18c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -162,7 +162,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { log.debug("Wait for exchange before processing message [msg=" + msg + ", node=" + nodeId + ", waitVer=" + startTopVer + - ", cacheDesc=" + cacheDescriptor(cacheMsg) + ']'); + ", cacheDesc=" + descriptorForMessage(cacheMsg) + ']'); } fut.listen(new CI1<IgniteInternalFuture<?>>() { @@ -251,8 +251,17 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { } }; - private DynamicCacheDescriptor cacheDescriptor(GridCacheMessage msg) { - return null; // TODO IGNITE-5075. + /** + * @param msg Message. + * @return Cache or group descriptor. + */ + private Object descriptorForMessage(GridCacheMessage msg) { + if (msg instanceof GridCacheIdMessage) + return cctx.cache().cacheDescriptor(((GridCacheIdMessage)msg).cacheId()); + else if (msg instanceof GridCacheGroupIdMessage) + return cctx.cache().cacheGroupDescriptors().get(((GridCacheGroupIdMessage)msg).groupId()); + + return null; } /** @@ -296,7 +305,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { msg0.append(", locTopVer=").append(cctx.exchange().readyAffinityVersion()). append(", msgTopVer=").append(cacheMsg.topologyVersion()). - append(", cacheDesc=").append(cacheDescriptor(cacheMsg)). + append(", desc=").append(descriptorForMessage(cacheMsg)). append(']'); msg0.append(U.nl()).append("Registered listeners:"); http://git-wip-us.apache.org/repos/asf/ignite/blob/f974c53b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index c651a65..7351d26 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -919,12 +919,14 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) { GridDhtPartitionFullMap map = top.partitionMap(true); - addFullPartitionsMap(m, - dupData, - compress, - top.groupId(), - map, - top.similarAffinityKey()); + if (map != null) { + addFullPartitionsMap(m, + dupData, + compress, + top.groupId(), + map, + top.similarAffinityKey()); + } if (exchId != null) m.addPartitionUpdateCounters(top.groupId(), top.updateCounters(true)); @@ -947,6 +949,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana Integer grpId, GridDhtPartitionFullMap map, Object affKey) { + assert map != null; Integer dupDataCache = null; if (compress && affKey != null && !m.containsGroup(grpId)) { http://git-wip-us.apache.org/repos/asf/ignite/blob/f974c53b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 5cb48ad..20dbdc8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -1152,7 +1152,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements * @param cacheId Cache ID. */ void onCacheStopped(int cacheId) { - assert grp.sharedGroup() : grp.nameForLog(); + assert grp.sharedGroup() : grp.cacheOrGroupName(); for (Iterator<RemovedEntryHolder> it = rmvQueue.iterator(); it.hasNext();) { RemovedEntryHolder e = it.next(); http://git-wip-us.apache.org/repos/asf/ignite/blob/f974c53b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java index 9617a0d..1e38cec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java @@ -134,8 +134,6 @@ public interface GridDhtPartitionTopology { public void releasePartitions(int... parts); /** - * @param key Cache key. - * @param create If {@code true}, then partition will be created if it's not there. * @return Local partition. * @throws GridDhtInvalidPartitionException If partition is evicted or absent and * does not belong to this node. http://git-wip-us.apache.org/repos/asf/ignite/blob/f974c53b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 64c208b..a47d854 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -243,7 +243,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (dumpCnt++ < GridDhtPartitionsExchangeFuture.DUMP_PENDING_OBJECTS_THRESHOLD) { U.warn(log, "Failed to wait for partition eviction [" + "topVer=" + topVer + - ", group=" + grp.nameForLog() + + ", group=" + grp.cacheOrGroupName() + ", part=" + part.id() + ", partState=" + part.state() + ", size=" + part.internalSize() + @@ -337,7 +337,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { AffinityTopologyVersion topVer = this.topVer; assert topVer.topologyVersion() > 0 : "Invalid topology version [topVer=" + topVer + - ", group=" + grp.nameForLog() + ']'; + ", group=" + grp.cacheOrGroupName() + ']'; return topVer; } @@ -387,12 +387,12 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { assert topVer.equals(exchFut.topologyVersion()) : "Invalid topology [topVer=" + topVer + - ", grp=" + grp.nameForLog() + + ", grp=" + grp.cacheOrGroupName() + ", futVer=" + exchFut.topologyVersion() + ", fut=" + exchFut + ']'; assert grp.affinity().lastVersion().equals(exchFut.topologyVersion()) : "Invalid affinity [topVer=" + grp.affinity().lastVersion() + - ", grp=" + grp.nameForLog() + + ", grp=" + grp.cacheOrGroupName() + ", futVer=" + exchFut.topologyVersion() + ", fut=" + exchFut + ']'; @@ -414,7 +414,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { boolean owned = locPart.own(); - assert owned : "Failed to own partition for oldest node [grp=" + grp.nameForLog() + + assert owned : "Failed to own partition for oldest node [grp=" + grp.cacheOrGroupName() + ", part=" + locPart + ']'; if (log.isDebugEnabled()) @@ -654,7 +654,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (F.isEmpty(owners)) { boolean owned = locPart.own(); - assert owned : "Failed to own partition [grp=" + grp.nameForLog() + ", locPart=" + + assert owned : "Failed to own partition [grp=" + grp.cacheOrGroupName() + ", locPart=" + locPart + ']'; updateSeq = updateLocal(p, locPart.state(), updateSeq); @@ -961,7 +961,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer1=" + topVer + ", topVer2=" + this.topVer + ", node=" + ctx.igniteInstanceName() + - ", grp=" + grp.nameForLog() + + ", grp=" + grp.cacheOrGroupName() + ", node2part=" + node2part + ']'; List<ClusterNode> nodes = null; @@ -1016,7 +1016,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer=" + topVer + ", allIds=" + allIds + ", node2part=" + node2part + - ", grp=" + grp.nameForLog() + ']'; + ", grp=" + grp.cacheOrGroupName() + ']'; Collection<UUID> nodeIds = part2node.get(p); @@ -1091,7 +1091,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { return null; assert node2part.valid() : "Invalid node2part [node2part=" + node2part + - ", grp=" + grp.nameForLog() + + ", grp=" + grp.cacheOrGroupName() + ", stopping=" + stopping + ", locNodeId=" + ctx.localNode().id() + ", locName=" + ctx.igniteInstanceName() + ']'; @@ -1949,7 +1949,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { try { assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part + - ", grp=" + grp.nameForLog() + + ", grp=" + grp.cacheOrGroupName() + ", stopping=" + stopping + ", locNodeId=" + ctx.localNodeId() + ", locName=" + ctx.igniteInstanceName() + ']'; @@ -1984,7 +1984,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** {@inheritDoc} */ @Override public void printMemoryStats(int threshold) { X.println(">>> Cache partition topology stats [igniteInstanceName=" + ctx.igniteInstanceName() + - ", grp=" + grp.nameForLog() + ']'); + ", grp=" + grp.cacheOrGroupName() + ']'); lock.readLock().lock(); @@ -2039,7 +2039,7 @@ public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { rebalancedTopVer = topVer; if (log.isDebugEnabled()) - log.debug("Updated rebalanced version [cache=" + grp.nameForLog() + ", ver=" + rebalancedTopVer + ']'); + log.debug("Updated rebalanced version [cache=" + grp.cacheOrGroupName() + ", ver=" + rebalancedTopVer + ']'); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/f974c53b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index 9fc0f12..f9bc5df 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -140,6 +140,8 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa * @param dupDataCache Optional ID of cache with the same partition state map. */ public void addFullPartitionsMap(int grpId, GridDhtPartitionFullMap fullMap, @Nullable Integer dupDataCache) { + assert fullMap != null; + if (parts == null) parts = new HashMap<>();