IGNITE-5774 - Added more information about long-running partition release future
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a71691ae Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a71691ae Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a71691ae Branch: refs/heads/ignite-5578-locJoin Commit: a71691ae22e30eea775324363d13e8875be8c4d9 Parents: 0d6b730 Author: Alexey Goncharuk <[email protected]> Authored: Tue Jul 18 12:37:14 2017 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Tue Jul 18 12:37:14 2017 +0300 ---------------------------------------------------------------------- .../apache/ignite/IgniteSystemProperties.java | 7 +++ .../cache/CacheObjectsReleaseFuture.java | 60 ++++++++++++++++++++ .../cache/GridCacheExplicitLockSpan.java | 10 +++- .../processors/cache/GridCacheMvccManager.java | 24 +++++--- .../cache/GridCacheSharedContext.java | 4 +- .../GridDhtPartitionsExchangeFuture.java | 30 +++++++--- .../cache/transactions/IgniteTxAdapter.java | 44 ++++++++++++-- .../cache/transactions/IgniteTxManager.java | 5 +- .../util/future/GridCompoundFuture.java | 6 +- 9 files changed, 161 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a71691ae/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 1a2887a..616ac3f 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -677,6 +677,13 @@ public final class IgniteSystemProperties { "IGNITE_CLIENT_CACHE_CHANGE_MESSAGE_TIMEOUT"; /** + * If a partition release future completion time during an exchange exceeds this threshold, the contents of + * the future will be dumped to the log on exchange. Default is {@code 0} (disabled). + */ + public static final String IGNITE_PARTITION_RELEASE_FUTURE_DUMP_THRESHOLD = + "IGNITE_PARTITION_RELEASE_FUTURE_DUMP_THRESHOLD"; + + /** * Enforces singleton. */ private IgniteSystemProperties() { http://git-wip-us.apache.org/repos/asf/ignite/blob/a71691ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectsReleaseFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectsReleaseFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectsReleaseFuture.java new file mode 100644 index 0000000..f7af009 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheObjectsReleaseFuture.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.util.future.GridCompoundFuture; +import org.apache.ignite.lang.IgniteReducer; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class CacheObjectsReleaseFuture<T, R> extends GridCompoundFuture<T, R> { + /** */ + private AffinityTopologyVersion topVer; + + /** */ + private String type; + + /** + * @param type Wait object type. + * @param topVer Topology version to wait for. + */ + public CacheObjectsReleaseFuture(String type, AffinityTopologyVersion topVer) { + this.type = type; + this.topVer = topVer; + } + + /** + * @param type Wait object type. + * @param topVer Topology version to wait for. + * @param rdc Reducer object. + */ + public CacheObjectsReleaseFuture(String type, AffinityTopologyVersion topVer, @Nullable IgniteReducer<T, R> rdc) { + super(rdc); + + this.topVer = topVer; + this.type = type; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return type + "ReleaseFuture [topVer=" + topVer + ", futures=" + futures() + "]"; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a71691ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheExplicitLockSpan.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheExplicitLockSpan.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheExplicitLockSpan.java index df32e77..b0cb302 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheExplicitLockSpan.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheExplicitLockSpan.java @@ -51,15 +51,21 @@ public class GridCacheExplicitLockSpan extends ReentrantLock { /** Span lock release future. */ @GridToStringExclude - private final GridFutureAdapter<Object> releaseFut = new GridFutureAdapter<>(); + private final GridFutureAdapter<Object> releaseFut; /** * @param topVer Topology version. * @param cand Candidate. */ - public GridCacheExplicitLockSpan(AffinityTopologyVersion topVer, GridCacheMvccCandidate cand) { + public GridCacheExplicitLockSpan(final AffinityTopologyVersion topVer, final GridCacheMvccCandidate cand) { this.topVer = topVer; + releaseFut = new GridFutureAdapter<Object>() { + @Override public String toString() { + return "ExplicitLockSpan [topVer=" + topVer + ", firstCand=" + cand + "]"; + } + }; + ensureDeque(cand.key()).addFirst(cand); } http://git-wip-us.apache.org/repos/asf/ignite/blob/a71691ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index b156708..09bf762 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -1048,7 +1048,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { * @return Explicit locks release future. */ public IgniteInternalFuture<?> finishExplicitLocks(AffinityTopologyVersion topVer) { - GridCompoundFuture<Object, Object> res = new GridCompoundFuture<>(); + GridCompoundFuture<Object, Object> res = new CacheObjectsReleaseFuture<>("ExplicitLock", topVer); for (GridCacheExplicitLockSpan span : pendingExplicit.values()) { AffinityTopologyVersion snapshot = span.topologyVersion(); @@ -1069,7 +1069,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { */ @SuppressWarnings("unchecked") public IgniteInternalFuture<?> finishAtomicUpdates(AffinityTopologyVersion topVer) { - GridCompoundFuture<Object, Object> res = new FinishAtomicUpdateFuture(); + GridCompoundFuture<Object, Object> res = new FinishAtomicUpdateFuture("AtomicUpdate", topVer); for (GridCacheAtomicFuture<?> fut : atomicFuts.values()) { IgniteInternalFuture<Void> complete = fut.completeFuture(topVer); @@ -1088,11 +1088,13 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { * @return Finish update future. */ @SuppressWarnings("unchecked") - public IgniteInternalFuture<?> finishDataStreamerUpdates() { - GridCompoundFuture<Object, Object> res = new GridCompoundFuture<>(); + public IgniteInternalFuture<?> finishDataStreamerUpdates(AffinityTopologyVersion topVer) { + GridCompoundFuture<Void, Object> res = new CacheObjectsReleaseFuture<>("DataStreamer", topVer); - for (IgniteInternalFuture fut : dataStreamerFuts) - res.add(fut); + for (DataStreamerFuture fut : dataStreamerFuts) { + if (fut.topVer.compareTo(topVer) < 0) + res.add(fut); + } res.markInitialized(); @@ -1340,10 +1342,18 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { /** * Finish atomic update future. */ - private static class FinishAtomicUpdateFuture extends GridCompoundFuture<Object, Object> { + private static class FinishAtomicUpdateFuture extends CacheObjectsReleaseFuture<Object, Object> { /** */ private static final long serialVersionUID = 0L; + /** + * @param type Type. + * @param topVer Topology version. + */ + private FinishAtomicUpdateFuture(String type, AffinityTopologyVersion topVer) { + super(type, topVer); + } + /** {@inheritDoc} */ @Override protected boolean ignoreFailure(Throwable err) { Class cls = err.getClass(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a71691ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index efd90a8..5387cc8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -809,12 +809,12 @@ public class GridCacheSharedContext<K, V> { */ @SuppressWarnings({"unchecked"}) public IgniteInternalFuture<?> partitionReleaseFuture(AffinityTopologyVersion topVer) { - GridCompoundFuture f = new GridCompoundFuture(); + GridCompoundFuture f = new CacheObjectsReleaseFuture("Partition", topVer); f.add(mvcc().finishExplicitLocks(topVer)); f.add(tm().finishTxs(topVer)); f.add(mvcc().finishAtomicUpdates(topVer)); - f.add(mvcc().finishDataStreamerUpdates()); + f.add(mvcc().finishDataStreamerUpdates(topVer)); f.markInitialized(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a71691ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index ea34f75..c4a4f83 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReadWriteLock; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.events.CacheEvent; @@ -92,6 +93,7 @@ import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; import static org.apache.ignite.IgniteSystemProperties.IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT_LIMIT; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_PARTITION_RELEASE_FUTURE_DUMP_THRESHOLD; import static org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT; import static org.apache.ignite.IgniteSystemProperties.getBoolean; import static org.apache.ignite.IgniteSystemProperties.getLong; @@ -111,6 +113,10 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte public static final String EXCHANGE_LOG = "org.apache.ignite.internal.exchange.time"; /** */ + private static final int RELEASE_FUTURE_DUMP_THRESHOLD = + IgniteSystemProperties.getInteger(IGNITE_PARTITION_RELEASE_FUTURE_DUMP_THRESHOLD, 0); + + /** */ @GridToStringExclude private volatile DiscoCache discoCache; @@ -623,9 +629,8 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** * @param crd Coordinator flag. * @return Exchange type. - * @throws IgniteCheckedException If failed. */ - private ExchangeType onClusterStateChangeRequest(boolean crd) throws IgniteCheckedException { + private ExchangeType onClusterStateChangeRequest(boolean crd) { assert exchActions != null && !exchActions.empty() : this; StateChangeRequest req = exchActions.stateChangeRequest(); @@ -894,14 +899,15 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte IgniteInternalFuture fut = cctx.snapshot() .tryStartLocalSnapshotOperation(discoEvt); - if (fut != null) + if (fut != null) { fut.get(); - long end = U.currentTimeMillis(); + long end = U.currentTimeMillis(); - if (log.isInfoEnabled()) - log.info("Snapshot initialization completed [topVer=" + exchangeId().topologyVersion() + - ", time=" + (end - start) + "ms]"); + if (log.isInfoEnabled()) + log.info("Snapshot initialization completed [topVer=" + exchangeId().topologyVersion() + + ", time=" + (end - start) + "ms]"); + } } catch (IgniteCheckedException e) { U.error(log, "Error while starting snapshot operation", e); @@ -955,9 +961,15 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte long waitEnd = U.currentTimeMillis(); - if (log.isInfoEnabled()) + if (log.isInfoEnabled()) { + long waitTime = (waitEnd - waitStart); + + String futInfo = RELEASE_FUTURE_DUMP_THRESHOLD > 0 && waitTime > RELEASE_FUTURE_DUMP_THRESHOLD ? + partReleaseFut.toString() : "NA"; + log.info("Finished waiting for partition release future [topVer=" + exchangeId().topologyVersion() + - ", waitTime=" + (waitEnd - waitStart) + "ms]"); + ", waitTime=" + (waitEnd - waitStart) + "ms, futInfo=" + futInfo + "]"); + } IgniteInternalFuture<?> locksFut = cctx.mvcc().finishLocks(exchId.topologyVersion()); http://git-wip-us.apache.org/repos/asf/ignite/blob/a71691ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 880d9b9..91ce3ce 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -938,11 +938,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement fut = finFut; if (fut == null) { - fut = new GridFutureAdapter<IgniteInternalTx>() { - @Override public String toString() { - return S.toString(GridFutureAdapter.class, this, "tx", IgniteTxAdapter.this); - } - }; + fut = new TxFinishFuture(this); finFut = fut; } @@ -2287,4 +2283,42 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement return S.toString(TxShadow.class, this); } } + + /** + * + */ + private static class TxFinishFuture extends GridFutureAdapter<IgniteInternalTx> { + /** */ + @GridToStringInclude + private IgniteTxAdapter tx; + + /** */ + private volatile long completionTime; + + /** + * @param tx Transaction being awaited. + */ + private TxFinishFuture(IgniteTxAdapter tx) { + this.tx = tx; + } + + /** {@inheritDoc} */ + @Override public boolean onDone(@Nullable IgniteInternalTx res, @Nullable Throwable err) { + completionTime = U.currentTimeMillis(); + + return super.onDone(res, err); + } + + /** {@inheritDoc} */ + @Override public String toString() { + long ct = completionTime; + + if (ct == 0) + ct = U.currentTimeMillis(); + + long duration = ct - tx.startTime(); + + return S.toString(TxFinishFuture.class, this, "duration", duration); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/a71691ae/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 3a3b766..26a4a91 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -42,6 +42,7 @@ import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheObjectsReleaseFuture; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; @@ -535,7 +536,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { */ public IgniteInternalFuture<Boolean> finishTxs(AffinityTopologyVersion topVer) { GridCompoundFuture<IgniteInternalTx, Boolean> res = - new GridCompoundFuture<>( + new CacheObjectsReleaseFuture<>( + "Tx", + topVer, new IgniteReducer<IgniteInternalTx, Boolean>() { @Override public boolean collect(IgniteInternalTx e) { return true; http://git-wip-us.apache.org/repos/asf/ignite/blob/a71691ae/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java index 3e08cd9..74a8f41 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridCompoundFuture.java @@ -166,7 +166,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig * @return Collection of futures. */ @SuppressWarnings("unchecked") - public synchronized final Collection<IgniteInternalFuture<T>> futures() { + public final synchronized Collection<IgniteInternalFuture<T>> futures() { if (futs == null) return Collections.emptyList(); @@ -249,7 +249,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig /** * Clear futures. */ - protected synchronized final void clear() { + protected final synchronized void clear() { futs = null; } @@ -338,7 +338,7 @@ public class GridCompoundFuture<T, R> extends GridFutureAdapter<R> implements Ig /** * @return {@code True} if has at least one future. */ - protected synchronized final boolean hasFutures() { + protected final synchronized boolean hasFutures() { return futs != null; }
