ignite-5578 Affinity for local join
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4a46272c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4a46272c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4a46272c Branch: refs/heads/ignite-5578 Commit: 4a46272c61821e90e48c1e843f5dd1eda0320a09 Parents: 0e7064d Author: sboikov <sboi...@gridgain.com> Authored: Wed Jul 12 18:25:39 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Wed Jul 12 18:37:55 2017 +0300 ---------------------------------------------------------------------- .../communication/GridIoMessageFactory.java | 4 +- .../cache/CacheAffinitySharedManager.java | 1 + .../dht/preloader/CacheGroupAffinity.java | 159 ------------- .../preloader/CacheGroupAffinityMessage.java | 229 +++++++++++++++++++ .../GridDhtPartitionsExchangeFuture.java | 91 ++------ .../preloader/GridDhtPartitionsFullMessage.java | 11 +- .../CacheLateAffinityAssignmentTest.java | 2 +- 7 files changed, 262 insertions(+), 235 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4a46272c/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 261a619..003c2f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -81,7 +81,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNe import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.NearCacheUpdates; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.UpdateErrors; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CacheGroupAffinity; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CacheGroupAffinityMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; @@ -871,7 +871,7 @@ public class GridIoMessageFactory implements MessageFactory { break; case 128: - msg = new CacheGroupAffinity(); + msg = new CacheGroupAffinityMessage(); break; http://git-wip-us.apache.org/repos/asf/ignite/blob/4a46272c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 3f24547..879e6a9a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -1460,6 +1460,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap /** * @param fut Exchange future. + * @param newAff {@code True} if there are no older nodes with affinity info available. * @throws IgniteCheckedException If failed. * @return Future completed when caches initialization is done. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/4a46272c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinity.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinity.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinity.java deleted file mode 100644 index 1e1509a..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinity.java +++ /dev/null @@ -1,159 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.internal.GridDirectCollection; -import org.apache.ignite.internal.util.GridLongList; -import org.apache.ignite.plugin.extensions.communication.Message; -import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; -import org.apache.ignite.plugin.extensions.communication.MessageReader; -import org.apache.ignite.plugin.extensions.communication.MessageWriter; - -/** - * - */ -public class CacheGroupAffinity implements Message { - /** */ - private static final long serialVersionUID = 0L; - - /** */ - private int grpId; - - /** */ - @GridDirectCollection(GridLongList.class) - private List<GridLongList> assign; - - /** - * - */ - public CacheGroupAffinity() { - // No-op. - } - - /** - * @param grpId Group ID. - * @param assign0 Assignment. - */ - CacheGroupAffinity(int grpId, List<List<ClusterNode>> assign0) { - this.grpId = grpId; - - assign = new ArrayList<>(assign0.size()); - - for (int i = 0; i < assign0.size(); i++) { - List<ClusterNode> nodes = assign0.get(i); - - GridLongList l = new GridLongList(nodes.size()); - - for (int n = 0; n < nodes.size(); n++) - l.add(nodes.get(n).order()); - - assign.add(l); - } - } - - /** - * @return Cache group ID. - */ - int groupId() { - return grpId; - } - - /** - * @return Assignments. - */ - List<GridLongList> assignments() { - return assign; - } - - /** {@inheritDoc} */ - @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { - writer.setBuffer(buf); - - if (!writer.isHeaderWritten()) { - if (!writer.writeHeader(directType(), fieldsCount())) - return false; - - writer.onHeaderWritten(); - } - - switch (writer.state()) { - case 0: - if (!writer.writeCollection("assign", assign, MessageCollectionItemType.MSG)) - return false; - - writer.incrementState(); - - case 1: - if (!writer.writeInt("grpId", grpId)) - return false; - - writer.incrementState(); - - } - - return true; - } - - /** {@inheritDoc} */ - @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { - reader.setBuffer(buf); - - if (!reader.beforeMessageRead()) - return false; - - switch (reader.state()) { - case 0: - assign = reader.readCollection("assign", MessageCollectionItemType.MSG); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - case 1: - grpId = reader.readInt("grpId"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - - } - - return reader.afterMessageRead(CacheGroupAffinity.class); - } - - /** {@inheritDoc} */ - @Override public short directType() { - return 128; - } - - /** {@inheritDoc} */ - @Override public byte fieldsCount() { - return 2; - } - - /** {@inheritDoc} */ - @Override public void onAckReceived() { - // No-op. - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/4a46272c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java new file mode 100644 index 0000000..5cd5d26 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinityMessage.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.GridDirectCollection; +import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.util.GridLongList; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class CacheGroupAffinityMessage implements Message { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private int grpId; + + /** */ + @GridDirectCollection(GridLongList.class) + private List<GridLongList> assigns; + + /** + * + */ + public CacheGroupAffinityMessage() { + // No-op. + } + + /** + * @param grpId Group ID. + * @param assign0 Assignment. + */ + private CacheGroupAffinityMessage(int grpId, List<List<ClusterNode>> assign0) { + this.grpId = grpId; + + assigns = new ArrayList<>(assign0.size()); + + for (int i = 0; i < assign0.size(); i++) { + List<ClusterNode> nodes = assign0.get(i); + + GridLongList l = new GridLongList(nodes.size()); + + for (int n = 0; n < nodes.size(); n++) + l.add(nodes.get(n).order()); + + assigns.add(l); + } + } + + /** + * @return Cache group ID. + */ + int groupId() { + return grpId; + } + + /** + * @param cctx Context. + * @param topVer Topology version. + * @param affReq Cache group IDs. + * @param cachesAff Optional already prepared affinity. + * @return Affinity. + */ + static Map<Integer, CacheGroupAffinityMessage> createAffinityMessages( + GridCacheSharedContext cctx, + AffinityTopologyVersion topVer, + Collection<Integer> affReq, + @Nullable Map<Integer, CacheGroupAffinityMessage> cachesAff) { + assert !F.isEmpty(affReq); + + if (cachesAff == null) + cachesAff = U.newHashMap(affReq.size()); + + for (Integer grpId : affReq) { + if (!cachesAff.containsKey(grpId)) { + List<List<ClusterNode>> assign = cctx.affinity().affinity(grpId).assignments(topVer); + + cachesAff.put(grpId, new CacheGroupAffinityMessage(grpId, assign)); + } + } + + return cachesAff; + } + + /** + * @param nodesByOrder Nodes by order cache. + * @param discoCache Discovery data cache. + * @return Assignments. + */ + List<List<ClusterNode>> createAssignments(Map<Long, ClusterNode> nodesByOrder, DiscoCache discoCache) { + List<List<ClusterNode>> assignments0 = new ArrayList<>(assigns.size()); + + for (int p = 0; p < assigns.size(); p++) { + GridLongList assign = assigns.get(p); + List<ClusterNode> assign0 = new ArrayList<>(assign.size()); + + for (int n = 0; n < assign.size(); n++) { + long order = assign.get(n); + + ClusterNode affNode = nodesByOrder.get(order); + + if (affNode == null) { + affNode = discoCache.serverNodeByOrder(order); + + assert affNode != null : order; + + nodesByOrder.put(order, affNode); + } + + assign0.add(affNode); + } + + assignments0.add(assign0); + } + + return assignments0; + } + + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeCollection("assigns", assigns, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeInt("grpId", grpId)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + assigns = reader.readCollection("assigns", MessageCollectionItemType.MSG); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + grpId = reader.readInt("grpId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(CacheGroupAffinityMessage.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 128; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 2; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheGroupAffinityMessage.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4a46272c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 4a39bae..ab66df3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -77,7 +77,6 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage; import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; -import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.GridPartitionStateMap; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -1183,7 +1182,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * @param cachesAff Affinity if was requested by some nodes. * @throws IgniteCheckedException If failed. */ - private void sendAllPartitions(Collection<ClusterNode> nodes, Collection<CacheGroupAffinity> cachesAff) + private void sendAllPartitions(Collection<ClusterNode> nodes, Collection<CacheGroupAffinityMessage> cachesAff) throws IgniteCheckedException { boolean singleNode = nodes.size() == 1; @@ -1443,7 +1442,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte return; } - processMessage(node.id(), msg); + processSingleMessage(node.id(), msg); } }); } @@ -1453,7 +1452,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * @param nodeId Sender node. * @param msg Message. */ - private void processMessage(UUID nodeId, GridDhtPartitionsSingleMessage msg) { + private void processSingleMessage(UUID nodeId, GridDhtPartitionsSingleMessage msg) { boolean allReceived = false; boolean updateSingleMap = false; @@ -1723,29 +1722,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** - * @param affReq Cache group IDs. - * @param cachesAff Optional already prepared affinity. - * @return Affinity. - */ - private Map<Integer, CacheGroupAffinity> initCachesAffinity(Collection<Integer> affReq, - @Nullable Map<Integer, CacheGroupAffinity> cachesAff) { - assert !F.isEmpty(affReq); - - if (cachesAff == null) - cachesAff = U.newHashMap(affReq.size()); - - for (Integer grpId : affReq) { - if (!cachesAff.containsKey(grpId)) { - List<List<ClusterNode>> assign = cctx.affinity().affinity(grpId).assignments(topologyVersion()); - - cachesAff.put(grpId, new CacheGroupAffinity(grpId, assign)); - } - } - - return cachesAff; - } - - /** * */ private void onAllReceived() { @@ -1761,7 +1737,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } - Map<Integer, CacheGroupAffinity> cachesAff = null; + Map<Integer, CacheGroupAffinityMessage> cachesAff = null; for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) { GridDhtPartitionsSingleMessage msg = e.getValue(); @@ -1784,7 +1760,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte Collection<Integer> affReq = msg.cacheGroupsAffinityRequest(); if (affReq != null) { - cachesAff = initCachesAffinity(affReq, cachesAff); + cachesAff = CacheGroupAffinityMessage.createAffinityMessages(cctx, + topologyVersion(), + affReq, + cachesAff); UUID nodeId = e.getKey(); @@ -1930,10 +1909,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (n != null) { Collection<Integer> affReq = msg.cacheGroupsAffinityRequest(); - Collection<CacheGroupAffinity> cachesAff = null; + Collection<CacheGroupAffinityMessage> cachesAff = null; if (affReq != null) { - Map<Integer, CacheGroupAffinity> affMap = initCachesAffinity(affReq, null); + Map<Integer, CacheGroupAffinityMessage> affMap = CacheGroupAffinityMessage.createAffinityMessages( + cctx, + msg.exchangeId().topologyVersion(), + affReq, + null); cachesAff = affMap.values(); } @@ -2004,7 +1987,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte return; } - processMessage(node, msg); + processFullMessage(node, msg); } }); } @@ -2038,7 +2021,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * @param node Sender node. * @param msg Message. */ - private void processMessage(ClusterNode node, GridDhtPartitionsFullMessage msg) { + private void processFullMessage(ClusterNode node, GridDhtPartitionsFullMessage msg) { assert exchId.equals(msg.exchangeId()) : msg; assert msg.lastVersion() != null : msg; @@ -2065,50 +2048,27 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (localJoinExchange() && affReq != null) { Map<Long, ClusterNode> nodesByOrder = new HashMap<>(); - Collection<CacheGroupAffinity> cachesAff = msg.cachesAffinity(); + Collection<CacheGroupAffinityMessage> cachesAff = msg.cachesAffinity(); assert !F.isEmpty(cachesAff) : msg; assert cachesAff.size() >= affReq.size(); int cnt = 0; - for (CacheGroupAffinity aff : cachesAff) { + for (CacheGroupAffinityMessage aff : cachesAff) { if (affReq.contains(aff.groupId())) { CacheGroupContext grp = cctx.cache().cacheGroup(aff.groupId()); assert grp != null : aff.groupId(); assert AffinityTopologyVersion.NONE.equals(grp.affinity().lastVersion()); - List<GridLongList> assignments = aff.assignments(); - List<List<ClusterNode>> assignments0 = new ArrayList<>(assignments.size()); - - for (int p = 0; p < assignments.size(); p++) { - GridLongList assign = assignments.get(p); - List<ClusterNode> assign0 = new ArrayList<>(assign.size()); - - for (int n = 0; n < assign.size(); n++) { - long order = assign.get(n); - - ClusterNode affNode = nodesByOrder.get(order); - - if (affNode == null) { - affNode = discoCache.serverNodeByOrder(order); - - assert affNode != null : order; - - nodesByOrder.put(order, affNode); - } - - assign0.add(affNode); - } - - assignments0.add(assign0); - } + List<List<ClusterNode>> assignments = aff.createAssignments(nodesByOrder, discoCache); + // Calculate ideal assignments. if (!grp.affinity().centralizedAffinityFunction()) grp.affinity().calculate(topologyVersion(), discoEvt, discoCache); - grp.affinity().initialize(topologyVersion(), assignments0); + grp.affinity().initialize(topologyVersion(), assignments); try { grp.topology().initPartitions(this); @@ -2309,7 +2269,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte try { boolean crdChanged = false; boolean allReceived = false; - Set<UUID> remaining0 = null; ClusterNode crd0; @@ -2328,9 +2287,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } if (crd != null && crd.isLocal()) { - if (crdChanged) - remaining0 = new HashSet<>(remaining); - else if (crdReady && rmvd) + if (!crdChanged && crdReady && rmvd) allReceived = remaining.isEmpty(); } @@ -2390,7 +2347,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte sendPartitions(crd0); for (Map.Entry<ClusterNode, GridDhtPartitionsFullMessage> m : fullMsgs.entrySet()) - processMessage(m.getKey(), m.getValue()); + processFullMessage(m.getKey(), m.getValue()); } } } @@ -2447,7 +2404,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> m : singleMsgs.entrySet()) - processMessage(m.getKey(), m.getValue()); + processSingleMessage(m.getKey(), m.getValue()); } else { awaitSingleMapUpdates(); http://git-wip-us.apache.org/repos/asf/ignite/blob/4a46272c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index 1ef383a..edc9c9e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -33,7 +33,6 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; @@ -104,8 +103,8 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa private transient boolean compress; /** */ - @GridDirectCollection(CacheGroupAffinity.class) - private Collection<CacheGroupAffinity> cachesAff; + @GridDirectCollection(CacheGroupAffinityMessage.class) + private Collection<CacheGroupAffinityMessage> cachesAff; /** * Required by {@link Externalizable}. @@ -156,7 +155,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa * @param cachesAff Affinity. * @return Message copy. */ - GridDhtPartitionsFullMessage copyWithAffinity(Collection<CacheGroupAffinity> cachesAff) { + GridDhtPartitionsFullMessage copyWithAffinity(Collection<CacheGroupAffinityMessage> cachesAff) { assert !F.isEmpty(cachesAff) : cachesAff; GridDhtPartitionsFullMessage cp = new GridDhtPartitionsFullMessage(); @@ -171,14 +170,14 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa /** * @return Affinity. */ - @Nullable Collection<CacheGroupAffinity> cachesAffinity() { + @Nullable Collection<CacheGroupAffinityMessage> cachesAffinity() { return cachesAff; } /** * @param cachesAff Affinity. */ - void cachesAffinity(Collection<CacheGroupAffinity> cachesAff) { + void cachesAffinity(Collection<CacheGroupAffinityMessage> cachesAff) { this.cachesAff = cachesAff; } http://git-wip-us.apache.org/repos/asf/ignite/blob/4a46272c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java index bb99266..23043d1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java @@ -1458,7 +1458,7 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - public void _testRandomOperations() throws Exception { + public void testRandomOperations() throws Exception { forceSrvMode = true; final int MAX_SRVS = 10;