Repository: ignite Updated Branches: refs/heads/ignite-2.5 5cd32329f -> 442d87d57
IGNITE-8243 Fixed possible memory leak. Added latch manager to diagnostic messages. - Fixes #3850. Signed-off-by: dpavlov <[email protected]> (cherry picked from commit 6c6f094) Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/442d87d5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/442d87d5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/442d87d5 Branch: refs/heads/ignite-2.5 Commit: 442d87d5799c27ef7258c7ac36e3e0b312676713 Parents: 5cd3232 Author: Pavel Kovalenko <[email protected]> Authored: Wed Apr 18 21:03:32 2018 +0300 Committer: dpavlov <[email protected]> Committed: Wed Apr 18 21:06:02 2018 +0300 ---------------------------------------------------------------------- .../GridCachePartitionExchangeManager.java | 2 ++ .../preloader/latch/ExchangeLatchManager.java | 27 ++++++++++++++------ 2 files changed, 21 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/442d87d5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 20a3ccb..7ea161a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1633,6 +1633,8 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } } + U.warn(diagnosticLog, "Latch manager state: " + latchMgr); + dumpPendingObjects(exchTopVer, diagCtx); for (CacheGroupContext grp : cctx.cache().cacheGroups()) http://git-wip-us.apache.org/repos/asf/ignite/blob/442d87d5/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java index 404f88f..b9c7dee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java @@ -64,21 +64,26 @@ public class ExchangeLatchManager { private final GridKernalContext ctx; /** Discovery manager. */ + @GridToStringExclude private final GridDiscoveryManager discovery; /** IO manager. */ + @GridToStringExclude private final GridIoManager io; /** Current coordinator. */ - private volatile ClusterNode coordinator; + @GridToStringExclude + private volatile ClusterNode crd; /** Pending acks collection. */ private final ConcurrentMap<T2<String, AffinityTopologyVersion>, Set<UUID>> pendingAcks = new ConcurrentHashMap<>(); /** Server latches collection. */ + @GridToStringInclude private final ConcurrentMap<T2<String, AffinityTopologyVersion>, ServerLatch> serverLatches = new ConcurrentHashMap<>(); /** Client latches collection. */ + @GridToStringInclude private final ConcurrentMap<T2<String, AffinityTopologyVersion>, ClientLatch> clientLatches = new ConcurrentHashMap<>(); /** Lock. */ @@ -97,15 +102,14 @@ public class ExchangeLatchManager { if (!ctx.clientNode()) { ctx.io().addMessageListener(GridTopic.TOPIC_EXCHANGE, (nodeId, msg, plc) -> { - if (msg instanceof LatchAckMessage) { + if (msg instanceof LatchAckMessage) processAck(nodeId, (LatchAckMessage) msg); - } }); // First coordinator initialization. ctx.discovery().localJoinFuture().listen(f -> { if (f.error() == null) - this.coordinator = getLatchCoordinator(AffinityTopologyVersion.NONE); + this.crd = getLatchCoordinator(AffinityTopologyVersion.NONE); }); ctx.event().addDiscoveryEventListener((e, cache) -> { @@ -288,6 +292,8 @@ public class ExchangeLatchManager { pendingAcks.computeIfAbsent(latchId, (id) -> new GridConcurrentHashSet<>()); pendingAcks.get(latchId).add(from); } + else if (coordinator.isLocal()) + serverLatches.remove(latchId); } else { if (log.isDebugEnabled()) log.debug("Process ack [latch=" + latchId + ", from=" + from + "]"); @@ -319,7 +325,7 @@ public class ExchangeLatchManager { */ private void becomeNewCoordinator() { if (log.isInfoEnabled()) - log.info("Become new coordinator " + coordinator.id()); + log.info("Become new coordinator " + crd.id()); List<T2<String, AffinityTopologyVersion>> latchesToRestore = new ArrayList<>(); latchesToRestore.addAll(pendingAcks.keySet()); @@ -347,7 +353,7 @@ public class ExchangeLatchManager { * @param left Left node. */ private void processNodeLeft(ClusterNode left) { - assert this.coordinator != null : "Coordinator is not initialized"; + assert this.crd != null : "Coordinator is not initialized"; lock.lock(); @@ -402,8 +408,8 @@ public class ExchangeLatchManager { } // Coordinator is changed. - if (coordinator.isLocal() && this.coordinator.id() != coordinator.id()) { - this.coordinator = coordinator; + if (coordinator.isLocal() && this.crd.id() != coordinator.id()) { + this.crd = coordinator; becomeNewCoordinator(); } @@ -693,4 +699,9 @@ public class ExchangeLatchManager { return S.toString(CompletableLatch.class, this); } } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ExchangeLatchManager.class, this); + } }
