Merge remote-tracking branch 'remotes/origin/ignite-5578-locJoin' into ignite-5578
# Conflicts: # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java # modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7970ff77 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7970ff77 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7970ff77 Branch: refs/heads/ignite-5578 Commit: 7970ff776d6a6e464595da5cc015fbd775601d36 Parents: 7a54832 545c76e Author: sboikov <sboi...@gridgain.com> Authored: Thu Jul 13 13:46:17 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Thu Jul 13 13:46:17 2017 +0300 ---------------------------------------------------------------------- examples/config/example-memory-policies.xml | 7 +- .../examples/datagrid/CacheQueryDdlExample.java | 3 +- .../jdbc2/JdbcDefaultNoOpCacheTest.java | 33 ++ .../JettyRestProcessorAbstractSelfTest.java | 2 +- .../ignite/jdbc/JdbcDefaultNoOpCacheTest.java | 35 ++ .../ignite/jdbc/JdbcNoDefaultCacheTest.java | 50 +- .../jdbc/suite/IgniteJdbcDriverTestSuite.java | 5 +- .../communication/GridIoMessageFactory.java | 9 +- .../internal/managers/discovery/DiscoCache.java | 39 ++ .../eventstorage/GridEventStorageManager.java | 309 ++++++------ .../eventstorage/HighPriorityListener.java | 28 ++ .../affinity/GridAffinityAssignmentCache.java | 8 + .../cache/CacheAffinitySharedManager.java | 153 ++++-- .../processors/cache/ExchangeContext.java | 55 ++- .../processors/cache/GridCacheMvccManager.java | 5 - .../GridCachePartitionExchangeManager.java | 45 +- .../processors/cache/GridCacheProcessor.java | 6 +- .../processors/cache/IgniteCacheProxy.java | 5 +- .../dht/GridClientPartitionTopology.java | 7 +- .../dht/GridDhtPartitionTopology.java | 12 +- .../dht/GridDhtPartitionTopologyImpl.java | 45 +- .../cache/distributed/dht/GridDhtTxLocal.java | 13 - .../preloader/CacheGroupAffinityMessage.java | 229 +++++++++ .../GridDhtPartitionsAbstractMessage.java | 9 + .../GridDhtPartitionsExchangeFuture.java | 480 +++++++++++++------ .../preloader/GridDhtPartitionsFullMessage.java | 76 ++- .../GridDhtPartitionsSingleMessage.java | 40 +- .../GridCacheDatabaseSharedManager.java | 9 +- .../cluster/GridClusterStateProcessor.java | 4 + .../continuous/GridContinuousProcessor.java | 91 ++-- .../processors/igfs/IgfsNoopProcessor.java | 11 + .../internal/processors/igfs/IgfsProcessor.java | 12 +- .../processors/igfs/IgfsProcessorAdapter.java | 3 +- .../processors/rest/GridRestProcessor.java | 47 +- .../processors/task/GridTaskProcessor.java | 18 +- .../ignite/internal/util/GridListSet.java | 8 + .../visor/VisorCoordinatorNodeTask.java | 39 ++ .../communication/tcp/TcpCommunicationSpi.java | 30 +- .../resources/META-INF/classnames.properties | 13 +- .../cache/GridCacheLuceneQueryIndexTest.java | 466 ------------------ .../GridCacheQueryIndexingDisabledSelfTest.java | 16 +- .../IgniteClusterActivateDeactivateTest.java | 2 +- .../IgniteMarshallerCacheFSRestoreTest.java | 2 + .../CacheLateAffinityAssignmentTest.java | 61 +-- .../distributed/CachePartitionStateTest.java | 410 ++++++++++++++++ .../TestCacheNodeExcludingFilter.java | 53 ++ .../db/IgnitePdsCacheRestoreTest.java | 208 ++++++++ .../ignite/testsuites/IgnitePdsTestSuite.java | 5 +- modules/indexing/pom.xml | 12 + .../query/h2/opt/GridLuceneDirectory.java | 47 +- .../query/h2/opt/GridLuceneIndex.java | 75 +-- .../query/h2/opt/GridLuceneInputStream.java | 94 ++-- .../query/h2/opt/GridLuceneLockFactory.java | 45 +- .../query/h2/opt/GridLuceneOutputStream.java | 72 +-- .../cache/GridCacheFullTextQuerySelfTest.java | 367 ++++++++++++++ .../IgniteCacheAbstractFieldsQuerySelfTest.java | 30 +- ...teCacheFullTextQueryNodeJoiningSelfTest.java | 4 +- ...niteCachePartitionedFieldsQuerySelfTest.java | 20 +- .../IgniteCacheQuerySelfTestSuite.java | 7 +- .../Cache/Query/CacheQueriesTest.cs | 41 +- .../Apache.Ignite.Core/IgniteConfiguration.cs | 2 + .../helpers/jade/form/form-field-checkbox.pug | 2 +- .../helpers/jade/form/form-field-dropdown.pug | 1 - .../frontend/app/helpers/jade/mixins.pug | 1 + .../configuration/generator/Maven.service.js | 10 +- .../frontend/app/primitives/dropdown/index.scss | 4 + .../app/primitives/form-field/index.scss | 48 +- .../frontend/app/primitives/index.js | 2 + .../frontend/app/primitives/radio/index.pug | 41 ++ .../frontend/app/primitives/radio/index.scss | 78 +++ .../frontend/app/primitives/switch/index.pug | 34 -- .../frontend/app/primitives/switch/index.scss | 87 ---- .../frontend/app/primitives/switcher/index.scss | 2 +- .../frontend/app/primitives/tooltip/index.scss | 25 + .../app/primitives/typography/index.scss | 2 +- .../app/primitives/ui-grid-settings/index.scss | 47 +- .../frontend/app/primitives/ui-grid/index.scss | 13 +- .../frontend/app/services/Confirm.service.js | 2 +- .../frontend/app/services/Messages.service.js | 3 + .../ignite/console/demo/AgentClusterDemo.java | 12 + .../demo/service/DemoCachesLoadService.java | 1 + parent/pom.xml | 4 +- 82 files changed, 3020 insertions(+), 1431 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7970ff77/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index a8cf249,8a293ae..7659e73 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@@ -1223,38 -1231,19 +1231,50 @@@ public class CacheAffinitySharedManager (affNodes.isEmpty() || (affNodes.size() == 1 && affNodes.contains(cctx.localNode()))); } + public List<List<ClusterNode>> affinity(AffinityTopologyVersion topVer, Integer grpId) { + CacheGroupHolder grpHolder = grpHolders.get(grpId); + + assert grpHolder != null : grpId; + + return grpHolder.affinity().assignments(topVer); + } + + /** + * @param crd Coordinator flag. + * @throws IgniteCheckedException If failed. + */ + public void onLocalJoin(boolean crd) throws IgniteCheckedException { + + } + + public void processDiscoveryEvents(ExchangeDiscoveryEvents evts) { + AffinityTopologyVersion topVer = evts.topologyVersion(); + + if (evts.serverLeft()) { + + } + else if (evts.serverJoin()) { + + } + else { + + } + } + + /** + * @param grpId Cache group ID. + * @return Affinity assignments. + */ + public GridAffinityAssignmentCache affinity(Integer grpId) { + CacheGroupHolder grpHolder = grpHolders.get(grpId); + + assert grpHolder != null : debugGroupName(grpId); + + return grpHolder.affinity(); + } + + /** * Called on exchange initiated by server node join. * * @param fut Exchange future. http://git-wip-us.apache.org/repos/asf/ignite/blob/7970ff77/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java index 8d880a6,6caca5f..cac88be --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeContext.java @@@ -17,16 -17,9 +17,19 @@@ package org.apache.ignite.internal.processors.cache; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; + import java.util.HashSet; + import java.util.Set; + import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; import org.jetbrains.annotations.Nullable; /** @@@ -34,67 -27,40 +37,69 @@@ */ public class ExchangeContext { /** */ + private Set<Integer> requestGrpsAffOnJoin; + + /** */ + private boolean fetchAffOnJoin; + ++ /** */ + private final boolean coalescing; + + /** */ + private AffinityTopologyVersion resTopVer; + + /** */ + private final Map<Integer, List<List<ClusterNode>>> affMap = new HashMap<>(); + - /** */ - private Set<Integer> cacheGrpsOnLocStart; - // - // private Set<UUID> joinedNodes; - // - // public boolean nodeJoined(UUID nodeId) { - // return joinedNodes != null && joinedNodes.contains(nodeId); - // } - /** - * @param coalescing + * @param protocolVer Protocol version. */ - public ExchangeContext(AffinityTopologyVersion resTopVer, boolean coalescing) { - this.coalescing = coalescing; - this.resTopVer = resTopVer; + public ExchangeContext(int protocolVer) { + fetchAffOnJoin = protocolVer == 1; } - public AffinityTopologyVersion resultTopologyVersion() { - return resTopVer; - } - - public boolean coalescing() { - return coalescing; + /** + * @return {@code True} if on local join need fetch affinity per-group (old protocol), + * otherwise affinity is sent in {@link GridDhtPartitionsFullMessage}. + */ + boolean fetchAffinityOnJoin() { + return fetchAffOnJoin; } - public void addCacheGroupOnLocalStart(Integer grpId) { - if (cacheGrpsOnLocStart == null) - cacheGrpsOnLocStart = new HashSet<>(); + /** + * @param grpId Cache group ID. + */ + void addGroupAffinityRequestOnJoin(Integer grpId) { + if (requestGrpsAffOnJoin == null) + requestGrpsAffOnJoin = new HashSet<>(); - cacheGrpsOnLocStart.add(grpId); + requestGrpsAffOnJoin.add(grpId); } - @Nullable public Set<Integer> cacheGroupsOnLocalStart() { - return cacheGrpsOnLocStart; + /** + * @return Groups to request affinity for. + */ + @Nullable public Set<Integer> groupsAffinityRequestOnJoin() { + return requestGrpsAffOnJoin; } + + public List<List<ClusterNode>> activeAffinity(GridCacheSharedContext cctx, GridAffinityAssignmentCache aff) { + List<List<ClusterNode>> assignment = affMap.get(aff.groupId()); + + if (assignment != null) + return assignment; + + AffinityTopologyVersion affTopVer = aff.lastVersion(); + + assert affTopVer.topologyVersion() > 0 : "Affinity is not initialized [grp=" + aff.cacheOrGroupName() + + ", topVer=" + affTopVer + ", node=" + cctx.localNodeId() + ']'; + + List<List<ClusterNode>> curAff = aff.assignments(affTopVer); + + assert aff.idealAssignment() != null : "Previous assignment is not available."; + + affMap.put(aff.groupId(), curAff); + + return curAff; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/7970ff77/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index fa32b5f,06f336e..6646456 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@@ -130,8 -131,8 +131,8 @@@ public class GridCachePartitionExchange private static final int EXCHANGE_HISTORY_SIZE = IgniteSystemProperties.getInteger(IgniteSystemProperties.IGNITE_EXCHANGE_HISTORY_SIZE, 1_000); - /** */ + /** TODO IGNITE-5578. */ - public static final IgniteProductVersion EXCHANGE_COALESCING_SINCE = IgniteProductVersion.fromString("2.0.0"); + private static final IgniteProductVersion EXCHANGE_PROTOCOL_2_SINCE = IgniteProductVersion.fromString("2.1.0"); /** Atomic reference for pending timeout object. */ private AtomicReference<ResendTimeoutObject> pendingResend = new AtomicReference<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/7970ff77/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index d51e537,513f950..a461c1c --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@@ -63,7 -62,6 +64,8 @@@ import org.apache.ignite.internal.proce import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; import org.apache.ignite.internal.processors.cache.ExchangeActions; import org.apache.ignite.internal.processors.cache.ExchangeContext; +import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents; ++import org.apache.ignite.internal.processors.cache.ExchangeContext; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; @@@ -483,53 -495,21 +504,53 @@@ public class GridDhtPartitionsExchangeF } } else { - if (discoEvt.type() == EVT_NODE_JOINED) { - if (!discoEvt.eventNode().isLocal()) { - Collection<DynamicCacheDescriptor> receivedCaches = cctx.cache().startReceivedCaches( - discoEvt.eventNode().id(), - topVer); + if (exchCtx.coalescing()) { + if (discoEvt.type() == EVT_NODE_JOINED) { + if (discoEvt.eventNode().isLocal()) { + localJoin(); + + if (crdNode) { + exchange = ExchangeType.NONE; + } + else + sendLocalJoinMessage(crd); + } + else { + if (CU.clientNode(discoEvt.eventNode())) { + onClientNodeEvent(crdNode); + + exchange = ExchangeType.NONE; + } + else { + if (cctx.kernalContext().clientNode()) + exchange = ExchangeType.CLIENT; + else { + + } + } + } + } + else { + + } + } + else { + if (discoEvt.type() == EVT_NODE_JOINED) { + if (!discoEvt.eventNode().isLocal()) { + Collection<DynamicCacheDescriptor> receivedCaches = cctx.cache().startReceivedCaches( + discoEvt.eventNode().id(), + topVer); - cctx.affinity().initStartedCaches(crdNode, this, receivedCaches); - } - else - localJoin(); + cctx.affinity().initStartedCaches(crdNode, this, receivedCaches); } + else + initCachesOnLocalJoin(); + } - exchange = CU.clientNode(discoEvt.eventNode()) ? - onClientNodeEvent(crdNode) : - onServerNodeEvent(crdNode); + exchange = CU.clientNode(discoEvt.eventNode()) ? + onClientNodeEvent(crdNode) : + onServerNodeEvent(crdNode); + } } updateTopologies(crdNode); @@@ -1527,28 -1423,13 +1531,32 @@@ ", fut=" + this + ']'); if (!centralizedAff) - sendAllPartitions(node.id(), cctx.gridConfig().getNetworkSendRetryCount()); + sendAllPartitions(msg, node.id(), cctx.gridConfig().getNetworkSendRetryCount()); } else { + assert !msg.client() : msg; + + updateLastVersion(msg.lastVersion()); + + GridDhtPartitionsExchangeFuture mergedWith0 = null; + + synchronized (this) { + if (mergedWith != null) + mergedWith0 = mergedWith; + else { + if (pendingMsgs == null) + pendingMsgs = new ArrayList<>(); + + pendingMsgs.add(new T2<>(node, msg)); + } + } + + if (mergedWith0 != null) { + mergedWith0.onReceive(node, msg); + + return; + } + initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() { @Override public void apply(IgniteInternalFuture<Boolean> f) { try { http://git-wip-us.apache.org/repos/asf/ignite/blob/7970ff77/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index 6930c28,edc9c9e..bd9eaf2 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@@ -25,15 -26,15 +26,17 @@@ import java.util.Map import java.util.Set; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; + import org.apache.ignite.internal.GridDirectCollection; +import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.tostring.GridToStringInclude; + import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@@ -102,11 -103,8 +105,11 @@@ public class GridDhtPartitionsFullMessa private transient boolean compress; /** */ + private AffinityTopologyVersion resTopVer; + + /** */ - @GridDirectMap(keyType = Integer.class, valueType = GridLongList.class) - private Map<Integer, GridLongList> cachesAff; + @GridDirectCollection(CacheGroupAffinityMessage.class) + private Collection<CacheGroupAffinityMessage> cachesAff; /** * Required by {@link Externalizable}.