Exchange future cleanup, added special tasks for reassign/force rebalance.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/1be9b40c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/1be9b40c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/1be9b40c Branch: refs/heads/ignite-5578 Commit: 1be9b40c37efcbf332ebeeefc865c2fe864339e7 Parents: ecfbc2c Author: sboikov <sboi...@gridgain.com> Authored: Tue Jul 11 12:42:54 2017 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue Jul 11 12:42:54 2017 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 140 +++++++++++-------- .../processors/cache/GridCachePreloader.java | 8 +- .../cache/GridCachePreloaderAdapter.java | 4 +- .../preloader/ForceRebalanceExchangeTask.java | 58 ++++++++ .../dht/preloader/GridDhtPartitionDemander.java | 22 +-- .../preloader/GridDhtPartitionExchangeId.java | 61 +++++++- .../GridDhtPartitionsExchangeFuture.java | 136 +----------------- .../dht/preloader/GridDhtPreloader.java | 33 ++--- .../preloader/GridDhtPreloaderAssignments.java | 21 ++- .../RebalanceReassignExchangeTask.java | 44 ++++++ 10 files changed, 291 insertions(+), 236 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/1be9b40c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index de0adc7..d4fe93f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -65,6 +65,7 @@ import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCach import org.apache.ignite.internal.processors.cache.distributed.dht.GridClientPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.ForceRebalanceExchangeTask; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; @@ -77,6 +78,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.Gri import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionHistorySuppliersMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtPartitionsToReloadMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.RebalanceReassignExchangeTask; import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; @@ -371,7 +373,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana "Node joined with smaller-than-local " + "order [newOrder=" + n.order() + ", locOrder=" + loc.order() + ']'; - exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type()); + exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt); exchFut = exchangeFuture(exchId, evt, cache,null, null); } @@ -384,7 +386,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana ExchangeActions exchActions = stateChangeMsg.exchangeActions(); if (exchActions != null) { - exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type()); + exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt); exchFut = exchangeFuture(exchId, evt, cache, exchActions, null); } @@ -395,7 +397,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana ExchangeActions exchActions = batch.exchangeActions(); if (exchActions != null) { - exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type()); + exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt); exchFut = exchangeFuture(exchId, evt, cache, exchActions, null); } @@ -405,7 +407,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (msg.exchangeId() == null) { if (msg.exchangeNeeded()) { - exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type()); + exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt); exchFut = exchangeFuture(exchId, evt, cache, null, msg); } @@ -416,7 +418,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } else if (customMsg instanceof SnapshotDiscoveryMessage && ((SnapshotDiscoveryMessage) customMsg).needExchange()) { - exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt.type()); + exchId = exchangeId(n.id(), affinityTopologyVersion(evt), evt); exchFut = exchangeFuture(exchId, evt, null, null, null); } @@ -480,7 +482,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana assert discoEvt.topologyVersion() == startTopVer.topologyVersion(); - return exchangeId(cctx.localNode().id(), startTopVer, EVT_NODE_JOINED); + return exchangeId(cctx.localNode().id(), startTopVer, discoEvt); } /** @@ -845,27 +847,18 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** - * @param exchFut Exchange future. - * @param reassign Dummy reassign flag. + * @param exchId Exchange ID. */ - public void forceDummyExchange(boolean reassign, - GridDhtPartitionsExchangeFuture exchFut) { - exchWorker.addExchangeFuture( - new GridDhtPartitionsExchangeFuture(cctx, reassign, exchFut.discoveryEvent(), exchFut.exchangeId())); + public void forceReassign(GridDhtPartitionExchangeId exchId) { + exchWorker.forceReassign(exchId); } /** - * Forces preload exchange. - * - * @param exchFut Exchange future. + * @param exchId Exchange ID. + * @return Rebalance future. */ - public IgniteInternalFuture<Boolean> forceRebalance(GridDhtPartitionsExchangeFuture exchFut) { - GridCompoundFuture<Boolean, Boolean> fut = new GridCompoundFuture<>(CU.boolReducer()); - - exchWorker.addExchangeFuture( - new GridDhtPartitionsExchangeFuture(cctx, exchFut.discoveryEvent(), exchFut.exchangeId(), fut)); - - return fut; + public IgniteInternalFuture<Boolean> forceRebalance(GridDhtPartitionExchangeId exchId) { + return exchWorker.forceRebalance(exchId); } /** @@ -1185,10 +1178,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana /** * @param nodeId Cause node ID. * @param topVer Topology version. - * @param evt Event type. - * @return Activity future ID. + * @param evt Event. + * @return Exchange ID instance. */ - private GridDhtPartitionExchangeId exchangeId(UUID nodeId, AffinityTopologyVersion topVer, int evt) { + private GridDhtPartitionExchangeId exchangeId(UUID nodeId, AffinityTopologyVersion topVer, DiscoveryEvent evt) { return new GridDhtPartitionExchangeId(nodeId, evt, topVer); } @@ -1635,7 +1628,9 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana * @return {@code True} if this is exchange task. */ private static boolean isExchangeTask(CachePartitionExchangeWorkerTask task) { - return task instanceof GridDhtPartitionsExchangeFuture; + return task instanceof GridDhtPartitionsExchangeFuture || + task instanceof RebalanceReassignExchangeTask || + task instanceof ForceRebalanceExchangeTask; } /** @@ -1745,13 +1740,32 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** + * @param exchId Exchange ID. + */ + void forceReassign(GridDhtPartitionExchangeId exchId) { + if (!hasPendingExchange() && !busy) + futQ.add(new RebalanceReassignExchangeTask(exchId)); + } + + /** + * @param exchId Exchange ID. + * @return Rebalance future. + */ + IgniteInternalFuture<Boolean> forceRebalance(GridDhtPartitionExchangeId exchId) { + GridCompoundFuture<Boolean, Boolean> fut = new GridCompoundFuture<>(CU.boolReducer()); + + futQ.add(new ForceRebalanceExchangeTask(exchId, fut)); + + return fut; + } + + /** * @param exchFut Exchange future. */ void addExchangeFuture(GridDhtPartitionsExchangeFuture exchFut) { assert exchFut != null; - if (!exchFut.dummy() || (!hasPendingExchange() && !busy)) - futQ.offer(exchFut); + futQ.offer(exchFut); if (log.isDebugEnabled()) log.debug("Added exchange future to exchange worker: " + exchFut); @@ -1880,22 +1894,37 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana continue; } - assert task instanceof GridDhtPartitionsExchangeFuture; - - GridDhtPartitionsExchangeFuture exchFut = (GridDhtPartitionsExchangeFuture)task; - busy = true; Map<Integer, GridDhtPreloaderAssignments> assignsMap = null; - boolean dummyReassign = exchFut.dummyReassign(); - boolean forcePreload = exchFut.forcePreload(); + boolean forcePreload = false; + + GridDhtPartitionExchangeId exchId; + + GridDhtPartitionsExchangeFuture exchFut = null; try { if (isCancelled()) break; - if (!exchFut.dummy() && !exchFut.forcePreload()) { + if (task instanceof RebalanceReassignExchangeTask) { + exchId = ((RebalanceReassignExchangeTask) task).exchangeId(); + } + else if (task instanceof ForceRebalanceExchangeTask) { + forcePreload = true; + + timeout = 0; // Force refresh. + + exchId = ((ForceRebalanceExchangeTask)task).exchangeId(); + } + else { + assert task instanceof GridDhtPartitionsExchangeFuture : task; + + exchFut = (GridDhtPartitionsExchangeFuture)task; + + exchId = exchFut.exchangeId(); + lastInitializedFut = exchFut; exchFut.init(); @@ -1961,18 +1990,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (!cctx.kernalContext().clientNode() && changed && !hasPendingExchange()) refreshPartitions(); } - else { - if (log.isDebugEnabled()) - log.debug("Got dummy exchange (will reassign)"); - if (!dummyReassign) { - timeout = 0; // Force refresh. - - continue; - } - } - - if (!exchFut.skipPreload() ) { + if (!cctx.kernalContext().clientNode()) { assignsMap = new HashMap<>(); for (CacheGroupContext grp : cctx.cache().cacheGroups()) { @@ -1982,7 +2001,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana // Don't delay for dummy reassigns to avoid infinite recursion. if (delay == 0 || forcePreload) - assigns = grp.preloader().assign(exchFut); + assigns = grp.preloader().assign(exchId, exchFut); assignsMap.put(grp.groupId(), assigns); } @@ -2017,6 +2036,11 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana boolean assignsCancelled = false; + GridCompoundFuture<Boolean, Boolean> forcedRebFut = null; + + if (task instanceof ForceRebalanceExchangeTask) + forcedRebFut = ((ForceRebalanceExchangeTask)task).forcedRebalanceFuture(); + for (Integer order : orderMap.descendingKeySet()) { for (Integer grpId : orderMap.get(order)) { CacheGroupContext grp = cctx.cache().cacheGroup(grpId); @@ -2035,7 +2059,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana forcePreload, cnt, r, - exchFut.forcedRebalanceFuture()); + forcedRebFut); if (cur != null) { rebList.add(grp.cacheOrGroupName()); @@ -2045,13 +2069,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } } - if (exchFut.forcedRebalanceFuture() != null) - exchFut.forcedRebalanceFuture().markInitialized(); + if (forcedRebFut != null) + forcedRebFut.markInitialized(); if (assignsCancelled) { // Pending exchange. U.log(log, "Skipping rebalancing (obsolete exchange ID) " + - "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() + - ", node=" + exchFut.discoveryEvent().eventNode().id() + ']'); + "[top=" + exchId.topologyVersion() + ", evt=" + exchId.discoveryEventName() + + ", node=" + exchId.nodeId() + ']'); } else if (r != null) { Collections.reverse(rebList); @@ -2060,20 +2084,20 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (!hasPendingExchange()) { U.log(log, "Rebalancing started " + - "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() + - ", node=" + exchFut.discoveryEvent().eventNode().id() + ']'); + "[top=" + exchId.topologyVersion() + ", evt=" + exchId.discoveryEventName() + + ", node=" + exchId.nodeId() + ']'); r.run(); // Starts rebalancing routine. } else U.log(log, "Skipping rebalancing (obsolete exchange ID) " + - "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() + - ", node=" + exchFut.discoveryEvent().eventNode().id() + ']'); + "[top=" + exchId.topologyVersion() + ", evt=" + exchId.discoveryEventName() + + ", node=" + exchId.nodeId() + ']'); } else U.log(log, "Skipping rebalancing (nothing scheduled) " + - "[top=" + exchFut.topologyVersion() + ", evt=" + exchFut.discoveryEvent().name() + - ", node=" + exchFut.discoveryEvent().eventNode().id() + ']'); + "[top=" + exchId.topologyVersion() + ", evt=" + exchId.discoveryEventName() + + ", node=" + exchId.nodeId() + ']'); } } catch (IgniteInterruptedCheckedException e) { http://git-wip-us.apache.org/repos/asf/ignite/blob/1be9b40c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java index 4e74532..dc1624a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloader.java @@ -26,6 +26,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments; @@ -63,10 +64,12 @@ public interface GridCachePreloader { public void onInitialExchangeComplete(@Nullable Throwable err); /** - * @param exchFut Exchange future to assign. + * @param exchId Exchange ID. + * @param exchFut Exchange future. * @return Assignments or {@code null} if detected that there are pending exchanges. */ - @Nullable public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut); + @Nullable public GridDhtPreloaderAssignments assign(GridDhtPartitionExchangeId exchId, + @Nullable GridDhtPartitionsExchangeFuture exchFut); /** * Adds assignments to preloader. @@ -75,6 +78,7 @@ public interface GridCachePreloader { * @param forcePreload Force preload flag. * @param cnt Counter. * @param next Runnable responsible for cache rebalancing start. + * @param forcedRebFut Rebalance future. * @return Rebalancing runnable. */ public Runnable addAssignments(GridDhtPreloaderAssignments assignments, http://git-wip-us.apache.org/repos/asf/ignite/blob/1be9b40c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java index d2ca229..c0accf6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePreloaderAdapter.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloaderAssignments; @@ -152,7 +153,8 @@ public class GridCachePreloaderAdapter implements GridCachePreloader { } /** {@inheritDoc} */ - @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) { + @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionExchangeId exchId, + GridDhtPartitionsExchangeFuture exchFut) { return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/1be9b40c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/ForceRebalanceExchangeTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/ForceRebalanceExchangeTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/ForceRebalanceExchangeTask.java new file mode 100644 index 0000000..c820175 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/ForceRebalanceExchangeTask.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; + +import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask; +import org.apache.ignite.internal.util.future.GridCompoundFuture; + +/** + * + */ +public class ForceRebalanceExchangeTask implements CachePartitionExchangeWorkerTask { + /** */ + private final GridDhtPartitionExchangeId exchId; + + /** */ + private final GridCompoundFuture<Boolean, Boolean> forcedRebFut; + + /** + * @param exchId Exchange ID. + * @param forcedRebFut Rebalance future. + */ + public ForceRebalanceExchangeTask(GridDhtPartitionExchangeId exchId, GridCompoundFuture<Boolean, Boolean> forcedRebFut) { + assert exchId != null; + assert forcedRebFut != null; + + this.exchId = exchId; + this.forcedRebFut = forcedRebFut; + } + + /** + * @return Exchange ID. + */ + public GridDhtPartitionExchangeId exchangeId() { + return exchId; + } + + /** + * @return Rebalance future. + */ + public GridCompoundFuture<Boolean, Boolean> forcedRebalanceFuture() { + return forcedRebFut; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/1be9b40c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java index 4f34aba..248b739 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java @@ -189,7 +189,7 @@ public class GridDhtPartitionDemander { } /** - * Force Rebalance. + * @return Rebalance future. */ IgniteInternalFuture<Boolean> forceRebalance() { GridTimeoutObject obj = lastTimeoutObj.getAndSet(null); @@ -207,7 +207,7 @@ public class GridDhtPartitionDemander { exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { - IgniteInternalFuture<Boolean> fut0 = ctx.exchange().forceRebalance(exchFut); + IgniteInternalFuture<Boolean> fut0 = ctx.exchange().forceRebalance(exchFut.exchangeId()); fut0.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() { @Override public void apply(IgniteInternalFuture<Boolean> future) { @@ -363,7 +363,7 @@ public class GridDhtPartitionDemander { @Override public void onTimeout() { exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> f) { - ctx.exchange().forceRebalance(exchFut); + ctx.exchange().forceRebalance(exchFut.exchangeId()); } }); } @@ -861,9 +861,9 @@ public class GridDhtPartitionDemander { /** Missed. */ private final Map<UUID, Collection<Integer>> missed = new HashMap<>(); - /** Exchange future. */ + /** Exchange ID. */ @GridToStringExclude - private final GridDhtPartitionsExchangeFuture exchFut; + private final GridDhtPartitionExchangeId exchId; /** Topology version. */ private final AffinityTopologyVersion topVer; @@ -884,7 +884,7 @@ public class GridDhtPartitionDemander { long updateSeq) { assert assigns != null; - exchFut = assigns.exchangeFuture(); + exchId = assigns.exchangeId(); topVer = assigns.topologyVersion(); this.grp = grp; @@ -898,7 +898,7 @@ public class GridDhtPartitionDemander { * Dummy future. Will be done by real one. */ RebalanceFuture() { - this.exchFut = null; + this.exchId = null; this.topVer = null; this.ctx = null; this.grp = null; @@ -1032,7 +1032,7 @@ public class GridDhtPartitionDemander { return; if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_LOADED)) - rebalanceEvent(p, EVT_CACHE_REBALANCE_PART_LOADED, exchFut.discoveryEvent()); + rebalanceEvent(p, EVT_CACHE_REBALANCE_PART_LOADED, exchId.discoveryEvent()); T2<Long, Collection<Integer>> t = remaining.get(nodeId); @@ -1108,7 +1108,7 @@ public class GridDhtPartitionDemander { onDone(false); //Finished but has missed partitions, will force dummy exchange - ctx.exchange().forceDummyExchange(true, exchFut); + ctx.exchange().forceReassign(exchId); return; } @@ -1125,7 +1125,7 @@ public class GridDhtPartitionDemander { */ private void sendRebalanceStartedEvent() { if (grp.eventRecordable(EVT_CACHE_REBALANCE_STARTED)) - rebalanceEvent(EVT_CACHE_REBALANCE_STARTED, exchFut.discoveryEvent()); + rebalanceEvent(EVT_CACHE_REBALANCE_STARTED, exchId.discoveryEvent()); } /** @@ -1133,7 +1133,7 @@ public class GridDhtPartitionDemander { */ private void sendRebalanceFinishedEvent() { if (grp.eventRecordable(EVT_CACHE_REBALANCE_STOPPED)) - rebalanceEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent()); + rebalanceEvent(EVT_CACHE_REBALANCE_STOPPED, exchId.discoveryEvent()); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/1be9b40c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java index 012634e..0a49415 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionExchangeId.java @@ -23,6 +23,9 @@ import java.io.ObjectInput; import java.io.ObjectOutput; import java.nio.ByteBuffer; import java.util.UUID; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.S; @@ -55,20 +58,27 @@ public class GridDhtPartitionExchangeId implements Message, Comparable<GridDhtPa /** Topology version. */ private AffinityTopologyVersion topVer; + /** */ + @GridDirectTransient + private DiscoveryEvent discoEvt; + /** * @param nodeId Node ID. - * @param evt Event. + * @param discoEvt Event. * @param topVer Topology version. */ - public GridDhtPartitionExchangeId(UUID nodeId, int evt, @NotNull AffinityTopologyVersion topVer) { + public GridDhtPartitionExchangeId(UUID nodeId, DiscoveryEvent discoEvt, AffinityTopologyVersion topVer) { assert nodeId != null; - assert evt == EVT_NODE_LEFT || evt == EVT_NODE_FAILED || evt == EVT_NODE_JOINED || - evt == EVT_DISCOVERY_CUSTOM_EVT; - assert topVer.topologyVersion() > 0; + assert topVer != null && topVer.topologyVersion() > 0 : topVer; + assert discoEvt != null; this.nodeId = nodeId; - this.evt = evt; + this.evt = discoEvt.type(); this.topVer = topVer; + this.discoEvt = discoEvt; + + assert evt == EVT_NODE_LEFT || evt == EVT_NODE_FAILED || evt == EVT_NODE_JOINED || + evt == EVT_DISCOVERY_CUSTOM_EVT; } /** @@ -93,6 +103,45 @@ public class GridDhtPartitionExchangeId implements Message, Comparable<GridDhtPa } /** + * @return Discovery event timestamp. + */ + long eventTimestamp() { + assert discoEvt != null; + + return discoEvt.timestamp(); + } + + /** + * @param discoEvt Discovery event. + */ + void discoveryEvent(DiscoveryEvent discoEvt) { + this.discoEvt = discoEvt; + } + + /** + * @return Discovery event. + */ + DiscoveryEvent discoveryEvent() { + assert discoEvt != null; + + return discoEvt; + } + + /** + * @return Discovery event node. + */ + public ClusterNode eventNode() { + return discoEvt.eventNode(); + } + + /** + * @return Discovery event name. + */ + public String discoveryEventName() { + return U.gridEventName(evt); + } + + /** * @return Order. */ public AffinityTopologyVersion topologyVersion() { http://git-wip-us.apache.org/repos/asf/ignite/blob/1be9b40c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 5760f87..8989aaa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -75,7 +75,6 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage; import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; -import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -110,14 +109,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte implements Comparable<GridDhtPartitionsExchangeFuture>, CachePartitionExchangeWorkerTask, IgniteDiagnosticAware { /** */ public static final String EXCHANGE_LOG = "org.apache.ignite.internal.exchange.time"; - /** Dummy flag. */ - private final boolean dummy; - - /** Force preload flag. */ - private final boolean forcePreload; - - /** Dummy reassign flag. */ - private final boolean reassign; /** */ @GridToStringExclude @@ -196,9 +187,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** */ private CacheAffinityChangeMessage affChangeMsg; - /** Skip preload flag. */ - private boolean skipPreload; - /** */ private boolean clientOnlyExchange; @@ -221,9 +209,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte @GridToStringExclude private volatile IgniteDhtPartitionHistorySuppliersMap partHistSuppliers = new IgniteDhtPartitionHistorySuppliersMap(); - /** Forced Rebalance future. */ - private GridCompoundFuture<Boolean, Boolean> forcedRebFut; - /** */ private volatile Map<Integer, Map<Integer, Long>> partHistReserved; @@ -235,62 +220,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte private final AtomicBoolean done = new AtomicBoolean(); /** - * Dummy future created to trigger reassignments if partition - * topology changed while preloading. - * - * @param cctx Cache context. - * @param reassign Dummy reassign flag. - * @param discoEvt Discovery event. - * @param exchId Exchange id. - */ - public GridDhtPartitionsExchangeFuture( - GridCacheSharedContext cctx, - boolean reassign, - DiscoveryEvent discoEvt, - GridDhtPartitionExchangeId exchId - ) { - dummy = true; - forcePreload = false; - - this.exchId = exchId; - this.reassign = reassign; - this.discoEvt = discoEvt; - this.cctx = cctx; - - log = cctx.logger(getClass()); - exchLog = cctx.logger(EXCHANGE_LOG); - - onDone(exchId.topologyVersion()); - } - - /** - * Force preload future created to trigger reassignments if partition - * topology changed while preloading. - * - * @param cctx Cache context. - * @param discoEvt Discovery event. - * @param exchId Exchange id. - * @param forcedRebFut Forced Rebalance future. - */ - public GridDhtPartitionsExchangeFuture(GridCacheSharedContext cctx, DiscoveryEvent discoEvt, - GridDhtPartitionExchangeId exchId, GridCompoundFuture<Boolean, Boolean> forcedRebFut) { - dummy = false; - forcePreload = true; - - this.exchId = exchId; - this.discoEvt = discoEvt; - this.cctx = cctx; - this.forcedRebFut = forcedRebFut; - - log = cctx.logger(getClass()); - exchLog = cctx.logger(EXCHANGE_LOG); - - reassign = true; - - onDone(exchId.topologyVersion()); - } - - /** * @param cctx Cache context. * @param busyLock Busy lock. * @param exchId Exchange ID. @@ -309,10 +238,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte assert exchId.topologyVersion() != null; assert exchActions == null || !exchActions.empty(); - dummy = false; - forcePreload = false; - reassign = false; - this.cctx = cctx; this.busyLock = busyLock; this.exchId = exchId; @@ -351,41 +276,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** - * @return Skip preload flag. - */ - public boolean skipPreload() { - return skipPreload; - } - - /** - * @return Dummy flag. - */ - public boolean dummy() { - return dummy; - } - - /** - * @return Force preload flag. - */ - public boolean forcePreload() { - return forcePreload; - } - - /** - * @return Dummy reassign flag. - */ - public boolean reassign() { - return reassign; - } - - /** - * @return {@code True} if dummy reassign. - */ - public boolean dummyReassign() { - return (dummy() || forcePreload()) && reassign(); - } - - /** * @param grpId Cache group ID. * @param partId Partition ID. * @return ID of history supplier node or null if it doesn't exist. @@ -453,6 +343,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte public void onEvent(GridDhtPartitionExchangeId exchId, DiscoveryEvent discoEvt, DiscoCache discoCache) { assert exchId.equals(this.exchId); + this.exchId.discoveryEvent(discoEvt); this.discoEvt = discoEvt; this.discoCache = discoCache; @@ -476,7 +367,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** * @return {@code True} if deactivate cluster exchange. */ - boolean deactivateCluster() { + private boolean deactivateCluster() { return exchActions != null && exchActions.deactivate(); } @@ -495,13 +386,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } /** - * @return Forced Rebalance future. - */ - @Nullable public GridCompoundFuture<Boolean, Boolean> forcedRebalanceFuture() { - return forcedRebFut; - } - - /** * @return {@code true} if entered to busy state. */ private boolean enterBusy() { @@ -538,7 +422,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte assert discoEvt != null : this; assert exchId.nodeId().equals(discoEvt.eventNode().id()) : this; - assert !dummy && !forcePreload : this; try { discoCache.updateAlives(cctx.discovery()); @@ -553,8 +436,6 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte boolean crdNode = crd != null && crd.isLocal(); - skipPreload = cctx.kernalContext().clientNode(); - exchLog.info("Started exchange init [topVer=" + topVer + ", crd=" + crdNode + ", evt=" + discoEvt.type() + @@ -1330,13 +1211,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** {@inheritDoc} */ @Override public boolean onDone(@Nullable AffinityTopologyVersion res, @Nullable Throwable err) { - boolean realExchange = !dummy && !forcePreload; - if (!done.compareAndSet(false, true)) - return dummy; + return false; if (err == null && - realExchange && !cctx.kernalContext().clientNode() && (serverNodeDiscoveryEvent() || affChangeMsg != null)) { for (GridCacheContext cacheCtx : cctx.cacheContexts()) { @@ -1347,7 +1225,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte } } - if (err == null && realExchange) { + if (err == null) { if (centralizedAff) { for (CacheGroupContext grp : cctx.cache().cacheGroups()) { if (grp.isLocal()) @@ -1417,14 +1295,14 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte cctx.database().releaseHistoryForExchange(); - if (err == null && realExchange) { + if (err == null) { for (CacheGroupContext grp : cctx.cache().cacheGroups()) { if (!grp.isLocal()) grp.topology().onExchangeDone(grp.affinity().cachedAffinity(topologyVersion())); } } - if (super.onDone(res, err) && realExchange) { + if (super.onDone(res, err)) { if (log.isDebugEnabled()) log.debug("Completed partition exchange [localNode=" + cctx.localNodeId() + ", exchange= " + this + ", durationFromInit=" + (U.currentTimeMillis() - initTs) + ']'); @@ -1446,7 +1324,7 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte return true; } - return dummy; + return false; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/1be9b40c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java index 2b18c24..7efd4aa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloader.java @@ -179,22 +179,21 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { } /** {@inheritDoc} */ - @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionsExchangeFuture exchFut) { + @Override public GridDhtPreloaderAssignments assign(GridDhtPartitionExchangeId exchId, GridDhtPartitionsExchangeFuture exchFut) { // No assignments for disabled preloader. GridDhtPartitionTopology top = grp.topology(); if (!grp.rebalanceEnabled()) - return new GridDhtPreloaderAssignments(exchFut, top.topologyVersion()); + return new GridDhtPreloaderAssignments(exchId, top.topologyVersion()); int partCnt = grp.affinity().partitions(); - assert exchFut.forcePreload() || exchFut.dummyReassign() || - exchFut.exchangeId().topologyVersion().equals(top.topologyVersion()) : - "Topology version mismatch [exchId=" + exchFut.exchangeId() + + assert exchFut == null || exchFut.topologyVersion().equals(top.topologyVersion()) : + "Topology version mismatch [exchId=" + exchId + ", grp=" + grp.name() + ", topVer=" + top.topologyVersion() + ']'; - GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchFut, top.topologyVersion()); + GridDhtPreloaderAssignments assigns = new GridDhtPreloaderAssignments(exchId, top.topologyVersion()); AffinityTopologyVersion topVer = assigns.topologyVersion(); @@ -204,7 +203,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { if (ctx.exchange().hasPendingExchange()) { if (log.isDebugEnabled()) log.debug("Skipping assignments creation, exchange worker has pending assignments: " + - exchFut.exchangeId()); + exchId); assigns.cancelled(true); @@ -220,7 +219,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { ClusterNode histSupplier = null; - if (ctx.database().persistenceEnabled()) { + if (ctx.database().persistenceEnabled() && exchFut != null) { UUID nodeId = exchFut.partitionHistorySupplier(grp.groupId(), p); if (nodeId != null) @@ -243,7 +242,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { if (msg == null) { assigns.put(histSupplier, msg = new GridDhtPartitionDemandMessage( top.updateSequence(), - exchFut.exchangeId().topologyVersion(), + exchId.topologyVersion(), grp.groupId())); } @@ -292,14 +291,12 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { top.own(part); if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) { - DiscoveryEvent discoEvt = exchFut.discoveryEvent(); - - grp.addRebalanceEvent(p, - EVT_CACHE_REBALANCE_PART_DATA_LOST, - discoEvt.eventNode(), - discoEvt.type(), - discoEvt.timestamp()); - } + grp.addRebalanceEvent(p, + EVT_CACHE_REBALANCE_PART_DATA_LOST, + exchId.eventNode(), + exchId.event(), + exchId.eventTimestamp()); + } if (log.isDebugEnabled()) log.debug("Owning partition as there are no other owners: " + part); @@ -312,7 +309,7 @@ public class GridDhtPreloader extends GridCachePreloaderAdapter { if (msg == null) { assigns.put(n, msg = new GridDhtPartitionDemandMessage( top.updateSequence(), - exchFut.exchangeId().topologyVersion(), + exchId.topologyVersion(), grp.groupId())); } http://git-wip-us.apache.org/repos/asf/ignite/blob/1be9b40c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java index 3f82c9b..41dd076 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPreloaderAssignments.java @@ -30,25 +30,24 @@ public class GridDhtPreloaderAssignments extends ConcurrentHashMap<ClusterNode, /** */ private static final long serialVersionUID = 0L; - /** Exchange future. */ - @GridToStringExclude - private final GridDhtPartitionsExchangeFuture exchFut; + /** */ + private final GridDhtPartitionExchangeId exchangeId; - /** Last join order. */ + /** */ private final AffinityTopologyVersion topVer; /** */ private boolean cancelled; /** - * @param exchFut Exchange future. + * @param exchangeId Exchange ID. * @param topVer Last join order. */ - public GridDhtPreloaderAssignments(GridDhtPartitionsExchangeFuture exchFut, AffinityTopologyVersion topVer) { - assert exchFut != null; + public GridDhtPreloaderAssignments(GridDhtPartitionExchangeId exchangeId, AffinityTopologyVersion topVer) { + assert exchangeId != null; assert topVer.topologyVersion() > 0 : topVer; - this.exchFut = exchFut; + this.exchangeId = exchangeId; this.topVer = topVer; } @@ -69,8 +68,8 @@ public class GridDhtPreloaderAssignments extends ConcurrentHashMap<ClusterNode, /** * @return Exchange future. */ - GridDhtPartitionsExchangeFuture exchangeFuture() { - return exchFut; + GridDhtPartitionExchangeId exchangeId() { + return exchangeId; } /** @@ -82,7 +81,7 @@ public class GridDhtPreloaderAssignments extends ConcurrentHashMap<ClusterNode, /** {@inheritDoc} */ @Override public String toString() { - return S.toString(GridDhtPreloaderAssignments.class, this, "exchId", exchFut.exchangeId(), + return S.toString(GridDhtPreloaderAssignments.class, this, "exchId", exchangeId, "super", super.toString()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/1be9b40c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java new file mode 100644 index 0000000..9a76a8e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/RebalanceReassignExchangeTask.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; + +import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask; + +/** + * + */ +public class RebalanceReassignExchangeTask implements CachePartitionExchangeWorkerTask { + /** */ + private final GridDhtPartitionExchangeId exchId; + + /** + * @param exchId Exchange ID. + */ + public RebalanceReassignExchangeTask(GridDhtPartitionExchangeId exchId) { + assert exchId != null; + + this.exchId = exchId; + } + + /** + * @return Exchange ID. + */ + public GridDhtPartitionExchangeId exchangeId() { + return exchId; + } +}