Repository: ignite Updated Branches: refs/heads/ignite-5578-locJoin [created] 6c52ee107
ignite-5578 Affinity for local join Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6c52ee10 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6c52ee10 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6c52ee10 Branch: refs/heads/ignite-5578-locJoin Commit: 6c52ee107cde481f43bba1772267ab83361c9497 Parents: e93b284 Author: sboikov <sboi...@gridgain.com> Authored: Tue Jul 11 18:04:49 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue Jul 11 19:56:23 2017 +0300 ---------------------------------------------------------------------- .../communication/GridIoMessageFactory.java | 9 +- .../internal/managers/discovery/DiscoCache.java | 39 +++ .../cache/CacheAffinitySharedManager.java | 33 ++- .../processors/cache/ExchangeContext.java | 59 +++++ .../GridCachePartitionExchangeManager.java | 7 +- .../dht/preloader/CacheGroupAffinity.java | 156 +++++++++++ .../GridDhtPartitionsAbstractMessage.java | 9 + .../GridDhtPartitionsExchangeFuture.java | 259 +++++++++++++------ .../preloader/GridDhtPartitionsFullMessage.java | 90 ++++++- .../GridDhtPartitionsSingleMessage.java | 44 +++- .../GridCacheDatabaseSharedManager.java | 1 + 11 files changed, 593 insertions(+), 113 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6c52ee10/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 3dac18e..261a619 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -81,6 +81,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNe import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicUpdateResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.NearCacheUpdates; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.UpdateErrors; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CacheGroupAffinity; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; @@ -869,7 +870,13 @@ public class GridIoMessageFactory implements MessageFactory { break; - // [-3..119] [124..127] [-23..-27] [-36..-55]- this + case 128: + msg = new CacheGroupAffinity(); + + break; + + + // [-3..119] [124..128] [-23..-27] [-36..-55]- this // [120..123] - DR // [-4..-22, -30..-35] - SQL // [-54..-60] - Snapshots http://git-wip-us.apache.org/repos/asf/ignite/blob/6c52ee10/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java index 4c1077b..72f482a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoCache.java @@ -280,6 +280,45 @@ public class DiscoCache { } /** + * @param order Order. + * @return Server node instance. + */ + @Nullable public ClusterNode serverNodeByOrder(long order) { + int idx = serverNodeBinarySearch(order); + + if (idx >= 0) + return srvNodes.get(idx); + + return null; + } + + /** + * @param order Node order. + * @return Node index. + */ + private int serverNodeBinarySearch(long order) { + int low = 0; + int high = srvNodes.size() - 1; + + while (low <= high) { + int mid = (low + high) >>> 1; + + ClusterNode midVal = srvNodes.get(mid); + + int cmp = Long.compare(midVal.order(), order); + + if (cmp < 0) + low = mid + 1; + else if (cmp > 0) + high = mid - 1; + else + return mid; + } + + return -(low + 1); + } + + /** * @param nodes Cluster nodes. * @return Empty collection if nodes list is {@code null} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/6c52ee10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index 79ab183..f72d0e5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -1224,6 +1224,19 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap } /** + * @param topVer Topology version. + * @param grpId Cache group ID. + * @return Affinity assignments. + */ + public List<List<ClusterNode>> affinity(AffinityTopologyVersion topVer, Integer grpId) { + CacheGroupHolder grpHolder = grpHolders.get(grpId); + + assert grpHolder != null : grpId; + + return grpHolder.affinity().assignments(topVer); + } + + /** * Called on exchange initiated by server node join. * * @param fut Exchange future. @@ -1319,18 +1332,22 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap grp.affinity().initialize(fut.topologyVersion(), assignment); } else { - CacheGroupDescriptor grpDesc = caches.group(grp.groupId()); + if (fut.context().fetchAffinityOnJoin()) { + CacheGroupDescriptor grpDesc = caches.group(grp.groupId()); - assert grpDesc != null : grp.cacheOrGroupName(); + assert grpDesc != null : grp.cacheOrGroupName(); - GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx, - grpDesc.groupId(), - topVer, - fut.discoCache()); + GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cctx, + grpDesc.groupId(), + topVer, + fut.discoCache()); - fetchFut.init(false); + fetchFut.init(false); - fetchFuts.add(fetchFut); + fetchFuts.add(fetchFut); + } + else + fut.context().addGroupAffinityRequestOnJoin(grp.groupId()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/6c52ee10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java new file mode 100644 index 0000000..167ec4b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.HashSet; +import java.util.Set; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class ExchangeContext { + /** */ + private Set<Integer> requestGrpsAffOnJoin; + + /** */ + private boolean fetchAffOnJoin; + + /** + * @return {@code True} if on local join need fetch affinity per-group (old protocol), + * otherwise affinity is sent in {@link GridDhtPartitionsFullMessage}. + */ + public boolean fetchAffinityOnJoin() { + return false; + } + + /** + * @param grpId Cache group ID. + */ + public void addGroupAffinityRequestOnJoin(Integer grpId) { + if (requestGrpsAffOnJoin == null) + requestGrpsAffOnJoin = new HashSet<>(); + + requestGrpsAffOnJoin.add(grpId); + } + + /** + * @return Groups to request affinity for. + */ + @Nullable public Set<Integer> groupsAffinityRequestOnJoin() { + return requestGrpsAffOnJoin; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6c52ee10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index d4fe93f..69653a9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1068,8 +1068,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * @param id ID. */ private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) { - GridDhtPartitionsSingleMessage m = createPartitionsSingleMessage(node, - id, + GridDhtPartitionsSingleMessage m = createPartitionsSingleMessage(id, cctx.kernalContext().clientNode(), false); @@ -1090,14 +1089,12 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** - * @param targetNode Target node. * @param exchangeId ID. * @param clientOnlyExchange Client exchange flag. * @param sndCounters {@code True} if need send partition update counters. * @return Message. */ - public GridDhtPartitionsSingleMessage createPartitionsSingleMessage(ClusterNode targetNode, - @Nullable GridDhtPartitionExchangeId exchangeId, + public GridDhtPartitionsSingleMessage createPartitionsSingleMessage(@Nullable GridDhtPartitionExchangeId exchangeId, boolean clientOnlyExchange, boolean sndCounters) { http://git-wip-us.apache.org/repos/asf/ignite/blob/6c52ee10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinity.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinity.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinity.java new file mode 100644 index 0000000..e29ee06 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/CacheGroupAffinity.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.GridDirectCollection; +import org.apache.ignite.internal.util.GridLongList; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * + */ +public class CacheGroupAffinity implements Message { + /** */ + private int grpId; + + /** */ + @GridDirectCollection(GridLongList.class) + private List<GridLongList> assign; + + /** + * + */ + public CacheGroupAffinity() { + // No-op. + } + + /** + * @param grpId Group ID. + * @param assign0 Assignment. + */ + CacheGroupAffinity(int grpId, List<List<ClusterNode>> assign0) { + this.grpId = grpId; + + assign = new ArrayList<>(assign0.size()); + + for (int i = 0; i < assign0.size(); i++) { + List<ClusterNode> nodes = assign0.get(i); + + GridLongList l = new GridLongList(nodes.size()); + + for (int n = 0; n < nodes.size(); n++) + l.add(nodes.get(n).order()); + + assign.add(l); + } + } + + /** + * @return Cache group ID. + */ + int groupId() { + return grpId; + } + + /** + * @return Assignments. + */ + List<GridLongList> assignments() { + return assign; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeCollection("assign", assign, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeInt("grpId", grpId)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + assign = reader.readCollection("assign", MessageCollectionItemType.MSG); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + grpId = reader.readInt("grpId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(CacheGroupAffinity.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 128; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 2; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/6c52ee10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java index 441952d..20b33e7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsAbstractMessage.java @@ -64,6 +64,15 @@ public abstract class GridDhtPartitionsAbstractMessage extends GridCacheMessage this.lastVer = lastVer; } + /** + * @param msg Message. + */ + void copyStateTo(GridDhtPartitionsAbstractMessage msg) { + msg.exchId = exchId; + msg.lastVer = lastVer; + msg.flags = flags; + } + /** {@inheritDoc} */ @Override public boolean cacheGroupMessage() { return false; http://git-wip-us.apache.org/repos/asf/ignite/blob/6c52ee10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 8989aaa..72a3fe5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -60,6 +60,7 @@ import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerT import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.ExchangeActions; +import org.apache.ignite.internal.processors.cache.ExchangeContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; @@ -75,6 +76,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage; import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; +import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -187,9 +189,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** */ private CacheAffinityChangeMessage affChangeMsg; - /** */ - private boolean clientOnlyExchange; - /** Init timestamp. Used to track the amount of time spent to complete the future. */ private long initTs; @@ -219,6 +218,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** */ private final AtomicBoolean done = new AtomicBoolean(); + /** */ + @GridToStringExclude + private ExchangeContext exchCtx; + /** * @param cctx Cache context. * @param busyLock Busy lock. @@ -254,6 +257,15 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** + * @return Exchange context. + */ + public ExchangeContext context() { + assert exchCtx != null : this; + + return exchCtx; + } + + /** * @param exchActions Exchange actions. */ public void exchangeActions(ExchangeActions exchActions) { @@ -423,6 +435,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte assert discoEvt != null : this; assert exchId.nodeId().equals(discoEvt.eventNode().id()) : this; + exchCtx = new ExchangeContext(); + try { discoCache.updateAlives(cctx.discovery()); @@ -479,26 +493,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte cctx.affinity().initStartedCaches(crdNode, this, receivedCaches); } - else { - cctx.activate(); - - List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> caches = - cctx.cache().cachesToStartOnLocalJoin(); - - if (cctx.database().persistenceEnabled() && - !cctx.kernalContext().clientNode()) { - List<DynamicCacheDescriptor> startDescs = new ArrayList<>(); - - if (caches != null) { - for (T2<DynamicCacheDescriptor, NearCacheConfiguration> c : caches) - startDescs.add(c.get1()); - } - - cctx.database().readCheckpointAndRestoreMemory(startDescs); - } - - cctx.cache().startCachesOnLocalJoin(caches, topVer); - } + else + initCachesOnLocalJoin(); } exchange = CU.clientNode(discoEvt.eventNode()) ? @@ -565,6 +561,29 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** * @throws IgniteCheckedException If failed. */ + private void initCachesOnLocalJoin() throws IgniteCheckedException { + cctx.activate(); + + List<T2<DynamicCacheDescriptor, NearCacheConfiguration>> caches = + cctx.cache().cachesToStartOnLocalJoin(); + + if (cctx.database().persistenceEnabled() && !cctx.kernalContext().clientNode()) { + List<DynamicCacheDescriptor> startDescs = new ArrayList<>(); + + if (caches != null) { + for (T2<DynamicCacheDescriptor, NearCacheConfiguration> c : caches) + startDescs.add(c.get1()); + } + + cctx.database().readCheckpointAndRestoreMemory(startDescs); + } + + cctx.cache().startCachesOnLocalJoin(caches, topologyVersion()); + } + + /** + * @throws IgniteCheckedException If failed. + */ private void initTopologies() throws IgniteCheckedException { cctx.database().checkpointReadLock(); @@ -783,40 +802,15 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte * @throws IgniteCheckedException If failed. */ private void clientOnlyExchange() throws IgniteCheckedException { - clientOnlyExchange = true; - if (crd != null) { - if (crd.isLocal()) { - for (CacheGroupContext grp : cctx.cache().cacheGroups()) { - boolean updateTop = !grp.isLocal() && - exchId.topologyVersion().equals(grp.localStartVersion()); - - if (updateTop) { - for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) { - if (top.groupId() == grp.groupId()) { - GridDhtPartitionFullMap fullMap = top.partitionMap(true); + assert !crd.isLocal() : crd; - assert fullMap != null; - - grp.topology().update(topologyVersion(), - fullMap, - top.updateCounters(false), - Collections.<Integer>emptySet()); - - break; - } - } - } - } - } - else { - if (!centralizedAff) - sendLocalPartitions(crd); + if (!centralizedAff) + sendLocalPartitions(crd); - initDone(); + initDone(); - return; - } + return; } else { if (centralizedAff) { // Last server node failed. @@ -869,7 +863,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte if (grp.isLocal() || cacheGroupStopping(grp.groupId())) continue; - grp.topology().beforeExchange(this, !centralizedAff); + if (!localJoinExchange() || cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom())) + grp.topology().beforeExchange(this, !centralizedAff); } cctx.database().beforeExchange(this); @@ -889,8 +884,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte */ private void tryToPerformLocalSnapshotOperation() { try { - IgniteInternalFuture fut = cctx.snapshot() - .tryStartLocalSnapshotOperation(discoEvt); + IgniteInternalFuture fut = cctx.snapshot().tryStartLocalSnapshotOperation(discoEvt); if (fut != null) fut.get(); @@ -1096,12 +1090,21 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** + * @return {@code True} if exchange for local node join. + */ + private boolean localJoinExchange() { + return discoEvt.type() == EVT_NODE_JOINED && discoEvt.eventNode().isLocal(); + } + + /** * @param node Node. * @throws IgniteCheckedException If failed. */ private void sendLocalPartitions(ClusterNode node) throws IgniteCheckedException { assert node != null; + GridDhtPartitionsSingleMessage msg; + // Reset lost partition before send local partition to coordinator. if (exchActions != null) { Set<String> caches = exchActions.cachesToResetLostPartitions(); @@ -1110,22 +1113,33 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte resetLostPartitions(caches); } - GridDhtPartitionsSingleMessage m = cctx.exchange().createPartitionsSingleMessage( - node, exchangeId(), clientOnlyExchange, true); + if (cctx.kernalContext().clientNode() || (localJoinExchange() && !cctx.database().persistenceEnabled())) { + msg = new GridDhtPartitionsSingleMessage(exchangeId(), + cctx.kernalContext().clientNode(), + cctx.versions().last(), + true); + } + else { + msg = cctx.exchange().createPartitionsSingleMessage(exchangeId(), + false, + true); - Map<Integer, Map<Integer, Long>> partHistReserved0 = partHistReserved; + Map<Integer, Map<Integer, Long>> partHistReserved0 = partHistReserved; - if (partHistReserved0 != null) - m.partitionHistoryCounters(partHistReserved0); + if (partHistReserved0 != null) + msg.partitionHistoryCounters(partHistReserved0); + } if (stateChangeExchange() && changeGlobalStateE != null) - m.setError(changeGlobalStateE); + msg.setError(changeGlobalStateE); + else if (localJoinExchange()) + msg.cacheGroupsAffinityRequest(exchCtx.groupsAffinityRequestOnJoin()); if (log.isDebugEnabled()) - log.debug("Sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + ", msg=" + m + ']'); + log.debug("Sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + ", msg=" + msg + ']'); try { - cctx.io().send(node, m, SYSTEM_POOL); + cctx.io().send(node, msg, SYSTEM_POOL); } catch (ClusterTopologyCheckedException ignored) { if (log.isDebugEnabled()) @@ -1155,21 +1169,40 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** * @param nodes Nodes. + * @param cachesAff Affinity if was requested by some nodes. * @throws IgniteCheckedException If failed. */ - private void sendAllPartitions(Collection<ClusterNode> nodes) throws IgniteCheckedException { - GridDhtPartitionsFullMessage m = createPartitionsMessage(true); + private void sendAllPartitions(Collection<ClusterNode> nodes, Collection<CacheGroupAffinity> cachesAff) + throws IgniteCheckedException { + GridDhtPartitionsFullMessage msg = createPartitionsMessage(true); + + GridDhtPartitionsFullMessage msgWithAff = null; assert !nodes.contains(cctx.localNode()); if (log.isDebugEnabled()) { log.debug("Sending full partition map [nodeIds=" + F.viewReadOnly(nodes, F.node2id()) + - ", exchId=" + exchId + ", msg=" + m + ']'); + ", exchId=" + exchId + ", msg=" + msg + ']'); } + msg.prepareMarshal(cctx); + for (ClusterNode node : nodes) { + GridDhtPartitionsFullMessage sndMsg = msg; + + if (cachesAff != null) { + GridDhtPartitionsSingleMessage singleMsg = msgs.get(node.id()); + + if (singleMsg != null && singleMsg.cacheGroupsAffinityRequest() != null) { + if (msgWithAff == null) + msgWithAff = msg.copyWithAffinity(cachesAff); + + sndMsg = msgWithAff; + } + } + try { - cctx.io().send(node, m, SYSTEM_POOL); + cctx.io().send(node, sndMsg, SYSTEM_POOL); } catch (IgniteCheckedException e) { if (cctx.io().checkNodeLeft(node.id(), e, false)) { @@ -1687,21 +1720,32 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } - for (GridDhtPartitionsAbstractMessage msg : msgs.values()) { - if (msg instanceof GridDhtPartitionsSingleMessage) { - GridDhtPartitionsSingleMessage msg0 = (GridDhtPartitionsSingleMessage)msg; + Map<Integer, CacheGroupAffinity> cachesAff = null; + + for (GridDhtPartitionsSingleMessage msg : msgs.values()) { + for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg.partitions().entrySet()) { + Integer grpId = entry.getKey(); + CacheGroupContext grp = cctx.cache().cacheGroup(grpId); - for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg0.partitions().entrySet()) { - Integer grpId = entry.getKey(); - CacheGroupContext grp = cctx.cache().cacheGroup(grpId); + GridDhtPartitionTopology top = grp != null ? grp.topology() : + cctx.exchange().clientTopology(grpId, this); - GridDhtPartitionTopology top = grp != null ? grp.topology() : - cctx.exchange().clientTopology(grpId, this); + Map<Integer, T2<Long, Long>> cntrs = msg.partitionUpdateCounters(grpId); - Map<Integer, T2<Long, Long>> cntrs = msg0.partitionUpdateCounters(grpId); + if (cntrs != null) + top.applyUpdateCounters(cntrs); + } + + Collection<Integer> affReq = msg.cacheGroupsAffinityRequest(); + + if (affReq != null) { + if (cachesAff == null) + cachesAff = U.newHashMap(affReq.size()); - if (cntrs != null) - top.applyUpdateCounters(cntrs); + for (Integer grpId : affReq) { + List<List<ClusterNode>> assign = cctx.affinity().affinity(topologyVersion(), grpId); + + cachesAff.put(grpId, new CacheGroupAffinity(grpId, assign)); } } } @@ -1777,7 +1821,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } if (!nodes.isEmpty()) - sendAllPartitions(nodes); + sendAllPartitions(nodes, cachesAff != null ? cachesAff.values() : null); onDone(exchangeId().topologyVersion(), err); } @@ -1813,7 +1857,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte try { if (n != null) - sendAllPartitions(F.asList(n)); + sendAllPartitions(F.asList(n), null); } catch (IgniteCheckedException e) { if (e instanceof ClusterTopologyCheckedException || !cctx.discovery().alive(n)) { @@ -1907,6 +1951,59 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } + Set<Integer> affReq = exchCtx.groupsAffinityRequestOnJoin(); + + if (localJoinExchange() && affReq != null) { + Map<Long, ClusterNode> nodesByOrder = new HashMap<>(); + + Collection<CacheGroupAffinity> cachesAff = msg.cachesAffinity(); + + assert !F.isEmpty(cachesAff) : cachesAff; + assert cachesAff.size() >= affReq.size(); + + int cnt = 0; + + for (CacheGroupAffinity aff : cachesAff) { + CacheGroupContext grp = cctx.cache().cacheGroup(aff.groupId()); + + if (grp != null) { + assert AffinityTopologyVersion.NONE.equals(grp.affinity().lastVersion()); + + List<GridLongList> assignments = aff.assignments(); + List<List<ClusterNode>> assignments0 = new ArrayList<>(assignments.size()); + + for (int p = 0; p < assignments.size(); p++) { + GridLongList assign = assignments.get(p); + List<ClusterNode> assign0 = new ArrayList<>(assign.size()); + + for (int n = 0; n < assign.size(); n++) { + long order = assign.get(n); + + ClusterNode affNode = nodesByOrder.get(order); + + if (affNode == null) { + affNode = discoCache.serverNodeByOrder(order); + + assert affNode != null : order; + + nodesByOrder.put(order, affNode); + } + + assign0.add(affNode); + } + + assignments0.add(assign0); + } + + grp.affinity().initialize(topologyVersion(), assignments0); + + cnt++; + } + } + + assert affReq.size() == cnt : cnt; + } + updatePartitionFullMap(msg); IgniteCheckedException err = null; http://git-wip-us.apache.org/repos/asf/ignite/blob/6c52ee10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index 75609b8..0a723f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -19,19 +19,23 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; import java.io.Externalizable; import java.nio.ByteBuffer; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -99,6 +103,10 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa @GridDirectTransient private transient boolean compress; + /** */ + @GridDirectCollection(CacheGroupAffinity.class) + private Collection<CacheGroupAffinity> cachesAff; + /** * Required by {@link Externalizable}. */ @@ -126,6 +134,48 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa } /** {@inheritDoc} */ + @Override void copyStateTo(GridDhtPartitionsAbstractMessage msg) { + super.copyStateTo(msg); + + GridDhtPartitionsFullMessage cp = (GridDhtPartitionsFullMessage)msg; + + cp.parts = parts; + cp.dupPartsData = dupPartsData; + cp.partsBytes = partsBytes; + cp.partCntrs = partCntrs; + cp.partCntrsBytes = partCntrsBytes; + cp.partHistSuppliers = partHistSuppliers; + cp.partHistSuppliersBytes = partHistSuppliersBytes; + cp.partsToReload = partsToReload; + cp.partsToReloadBytes = partsToReloadBytes; + cp.topVer = topVer; + cp.cachesAff = cachesAff; + } + + /** + * @param cachesAff Affinity. + * @return Message copy. + */ + public GridDhtPartitionsFullMessage copyWithAffinity(Collection<CacheGroupAffinity> cachesAff) { + assert !F.isEmpty(cachesAff) : cachesAff; + + GridDhtPartitionsFullMessage cp = new GridDhtPartitionsFullMessage(); + + copyStateTo(cp); + + cp.cachesAff = cachesAff; + + return cp; + } + + /** + * @return Affinity. + */ + @Nullable public Collection<CacheGroupAffinity> cachesAffinity() { + return cachesAff; + } + + /** {@inheritDoc} */ @Override public int handlerId() { return 0; } @@ -406,42 +456,48 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa switch (writer.state()) { case 5: - if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT, MessageCollectionItemType.INT)) + if (!writer.writeCollection("cachesAff", cachesAff, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 6: - if (!writer.writeByteArray("errsBytes", errsBytes)) + if (!writer.writeMap("dupPartsData", dupPartsData, MessageCollectionItemType.INT, MessageCollectionItemType.INT)) return false; writer.incrementState(); case 7: - if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes)) + if (!writer.writeByteArray("errsBytes", errsBytes)) return false; writer.incrementState(); case 8: - if (!writer.writeByteArray("partHistSuppliersBytes", partHistSuppliersBytes)) + if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes)) return false; writer.incrementState(); case 9: - if (!writer.writeByteArray("partsBytes", partsBytes)) + if (!writer.writeByteArray("partHistSuppliersBytes", partHistSuppliersBytes)) return false; writer.incrementState(); case 10: - if (!writer.writeByteArray("partsToReloadBytes", partsToReloadBytes)) + if (!writer.writeByteArray("partsBytes", partsBytes)) return false; writer.incrementState(); case 11: + if (!writer.writeByteArray("partsToReloadBytes", partsToReloadBytes)) + return false; + + writer.incrementState(); + + case 12: if (!writer.writeMessage("topVer", topVer)) return false; @@ -464,7 +520,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa switch (reader.state()) { case 5: - dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT, MessageCollectionItemType.INT, false); + cachesAff = reader.readCollection("cachesAff", MessageCollectionItemType.MSG); if (!reader.isLastRead()) return false; @@ -472,7 +528,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa reader.incrementState(); case 6: - errsBytes = reader.readByteArray("errsBytes"); + dupPartsData = reader.readMap("dupPartsData", MessageCollectionItemType.INT, MessageCollectionItemType.INT, false); if (!reader.isLastRead()) return false; @@ -480,7 +536,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa reader.incrementState(); case 7: - partCntrsBytes = reader.readByteArray("partCntrsBytes"); + errsBytes = reader.readByteArray("errsBytes"); if (!reader.isLastRead()) return false; @@ -488,7 +544,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa reader.incrementState(); case 8: - partHistSuppliersBytes = reader.readByteArray("partHistSuppliersBytes"); + partCntrsBytes = reader.readByteArray("partCntrsBytes"); if (!reader.isLastRead()) return false; @@ -496,7 +552,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa reader.incrementState(); case 9: - partsBytes = reader.readByteArray("partsBytes"); + partHistSuppliersBytes = reader.readByteArray("partHistSuppliersBytes"); if (!reader.isLastRead()) return false; @@ -504,7 +560,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa reader.incrementState(); case 10: - partsToReloadBytes = reader.readByteArray("partsToReloadBytes"); + partsBytes = reader.readByteArray("partsBytes"); if (!reader.isLastRead()) return false; @@ -512,6 +568,14 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa reader.incrementState(); case 11: + partsToReloadBytes = reader.readByteArray("partsToReloadBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 12: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -531,7 +595,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 12; + return 13; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/6c52ee10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index b4d25c4..4c98742 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@ -17,12 +17,14 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; +import java.util.Collection; import java.util.Map; import java.util.HashMap; import java.nio.ByteBuffer; import java.util.Collections; import java.io.Externalizable; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; @@ -88,6 +90,10 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes @GridDirectTransient private transient boolean compress; + /** */ + @GridDirectCollection(Integer.class) + private Collection<Integer> grpsAffRequest; + /** * Required by {@link Externalizable}. */ @@ -111,6 +117,20 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes this.compress = compress; } + /** + * @param grpsAffRequest + */ + public void cacheGroupsAffinityRequest(Collection<Integer> grpsAffRequest) { + this.grpsAffRequest = grpsAffRequest; + } + + /** + * @return + */ + @Nullable public Collection<Integer> cacheGroupsAffinityRequest() { + return grpsAffRequest; + } + /** {@inheritDoc} */ @Override public int handlerId() { return 0; @@ -374,18 +394,24 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes writer.incrementState(); case 8: - if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes)) + if (!writer.writeCollection("grpsAffRequest", grpsAffRequest, MessageCollectionItemType.INT)) return false; writer.incrementState(); case 9: - if (!writer.writeByteArray("partHistCntrsBytes", partHistCntrsBytes)) + if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes)) return false; writer.incrementState(); case 10: + if (!writer.writeByteArray("partHistCntrsBytes", partHistCntrsBytes)) + return false; + + writer.incrementState(); + + case 11: if (!writer.writeByteArray("partsBytes", partsBytes)) return false; @@ -432,7 +458,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes reader.incrementState(); case 8: - partCntrsBytes = reader.readByteArray("partCntrsBytes"); + grpsAffRequest = reader.readCollection("grpsAffRequest", MessageCollectionItemType.INT); if (!reader.isLastRead()) return false; @@ -440,7 +466,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes reader.incrementState(); case 9: - partHistCntrsBytes = reader.readByteArray("partHistCntrsBytes"); + partCntrsBytes = reader.readByteArray("partCntrsBytes"); if (!reader.isLastRead()) return false; @@ -448,6 +474,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes reader.incrementState(); case 10: + partHistCntrsBytes = reader.readByteArray("partHistCntrsBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 11: partsBytes = reader.readByteArray("partsBytes"); if (!reader.isLastRead()) @@ -467,7 +501,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 11; + return 12; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/6c52ee10/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index 5136731..1f56cc0 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -1540,6 +1540,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan long partMetaId = pageMem.partitionMetaPageId(grpId, i); long partMetaPage = pageMem.acquirePage(grpId, partMetaId); + try { long pageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage);