This is an automated email from the ASF dual-hosted git repository. irakov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 9479377 IGNITE-13044 Additional possibility to check for high contending keys generated by the transaction payload. - Fixes #7824 9479377 is described below commit 947937733cc7da8d9dde81c38570aded6b57008a Author: zstan <stanilov...@gmail.com> AuthorDate: Fri May 29 20:35:56 2020 +0300 IGNITE-13044 Additional possibility to check for high contending keys generated by the transaction payload. - Fixes #7824 Signed-off-by: Ivan Rakov <ivan.glu...@gmail.com> --- .../org/apache/ignite/IgniteSystemProperties.java | 8 + .../java/org/apache/ignite/cache/CacheMetrics.java | 12 + .../org/apache/ignite/internal/IgniteFeatures.java | 10 +- .../org/apache/ignite/internal/IgniteKernal.java | 43 --- .../ignite/internal/TransactionsMXBeanImpl.java | 10 + .../cache/CacheClusterMetricsMXBeanImpl.java | 5 + .../cache/CacheLocalMetricsMXBeanImpl.java | 5 + .../processors/cache/CacheMetricsImpl.java | 61 ++++ .../processors/cache/CacheMetricsSnapshot.java | 5 + .../processors/cache/CacheMetricsSnapshotV2.java | 12 + .../cache/LongOperationsDumpSettingsClosure.java | 8 +- .../distributed/GridDistributedCacheEntry.java | 4 + .../cache/transactions/IgniteTxManager.java | 291 +++++++++++++++--- .../TxCollisionsDumpSettingsClosure.java} | 33 +- .../apache/ignite/internal/util/IgniteUtils.java | 30 ++ .../apache/ignite/mxbean/CacheMetricsMXBean.java | 17 +- .../apache/ignite/mxbean/TransactionsMXBean.java | 20 ++ .../processors/cache/CacheMetricsManageTest.java | 342 ++++++++++++++++++++- .../transactions/TxWithKeyContentionSelfTest.java | 338 ++++++++++++++++++++ .../TxWithSmallTimeoutAndContentionOneKeyTest.java | 9 + .../platform/PlatformCacheWriteMetricsTask.java | 5 + .../ignite/testsuites/IgniteCacheTestSuite7.java | 3 + 22 files changed, 1156 insertions(+), 115 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index ee5b2d1..b362a36 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -1319,6 +1319,14 @@ public final class IgniteSystemProperties { public static final String IGNITE_TCP_COMM_SET_ATTR_HOST_NAMES = "IGNITE_TCP_COMM_SET_ATTR_HOST_NAMES"; /** + * When above zero, prints tx key collisions once per interval. + * Each transaction besides OPTIMISTIC SERIALIZABLE capture locks on all enlisted keys, for some reasons + * per key lock queue may rise. This property sets the interval during which statistics are collected. + * Default is 1000 ms. + */ + public static final String IGNITE_DUMP_TX_COLLISIONS_INTERVAL = "IGNITE_DUMP_TX_COLLISIONS_INTERVAL"; + + /** * Enforces singleton. */ private IgniteSystemProperties() { diff --git a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java index 1921dbb..09bf550 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/CacheMetrics.java @@ -21,6 +21,9 @@ import javax.cache.Cache; import javax.cache.integration.CacheLoader; import javax.cache.integration.CacheWriter; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.mxbean.TransactionsMXBean; +import org.jetbrains.annotations.NotNull; /** * Cache metrics used to obtain statistics on cache itself. @@ -708,4 +711,13 @@ public interface CacheMetrics { * @return {@code true} when cache topology is valid for writing. */ public boolean isValidForWriting(); + + /** + * Checks if there were any tx key collisions last time. + * Interval for check specified throught: {@link IgniteSystemProperties#IGNITE_DUMP_TX_COLLISIONS_INTERVAL} or + * {@link TransactionsMXBean#setTxKeyCollisionsInterval(int)}. + * + * @return Key collisions and appropriate queue size string representation. + */ + @NotNull public String getTxKeyCollisions(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java index 0bba3f2..904d93e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteFeatures.java @@ -98,8 +98,14 @@ public enum IgniteFeatures { /** Persistence caches can be snapshot. */ PERSISTENCE_CACHE_SNAPSHOT(23), - /** Long operations dump timeout. */ - LONG_OPERATIONS_DUMP_TIMEOUT(30); + /** Distributed change timeout for dump long operations. */ + DISTRIBUTED_CHANGE_LONG_OPERATIONS_DUMP_TIMEOUT(30), + + /** Check secondary indexes inline size on join/by control utility request. */ + CHECK_INDEX_INLINE_SIZES(36), + + /** Distributed propagation of tx collisions dump interval. */ + DISTRIBUTED_TX_COLLISIONS_DUMP(37); /** * Unique feature identifier. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 7138214..a0a9311 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -234,7 +234,6 @@ import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor; import org.jetbrains.annotations.Nullable; -import static java.util.Objects.nonNull; import static org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2; import static org.apache.ignite.IgniteSystemProperties.IGNITE_CONFIG_URL; import static org.apache.ignite.IgniteSystemProperties.IGNITE_DAEMON; @@ -415,13 +414,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { @GridToStringExclude private GridTimeoutProcessor.CancelableTask metricsLogTask; - /** - * The instance of scheduled long operation checker. {@code null} means that the operations checker is disabled - * by the value of {@link IgniteSystemProperties#IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT} system property. - */ - @GridToStringExclude - private GridTimeoutProcessor.CancelableTask longOpDumpTask; - /** {@code true} if an error occurs at Ignite instance stop. */ @GridToStringExclude private boolean errOnStop; @@ -1525,8 +1517,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { }, metricsLogFreq, metricsLogFreq); } - scheduleLongOperationsDumpTask(ctx.cache().context().tm().longOperationsDumpTimeout()); - ctx.performance().add("Disable assertions (remove '-ea' from JVM options)", !U.assertionsEnabled()); ctx.performance().logSuggestions(log, igniteInstanceName); @@ -1551,36 +1541,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { } /** - * Scheduling tasks for dumping long operations. Closes current task - * (if any) and if the {@code longOpDumpTimeout > 0} schedules a new task - * with a new timeout, delay and start period equal to - * {@code longOpDumpTimeout}, otherwise task is deleted. - * - * @param longOpDumpTimeout Long operations dump timeout. - */ - public void scheduleLongOperationsDumpTask(long longOpDumpTimeout) { - if (isStopping()) - return; - - synchronized (this) { - GridTimeoutProcessor.CancelableTask task = longOpDumpTask; - - if (nonNull(task)) - task.close(); - - if (longOpDumpTimeout > 0) { - longOpDumpTask = ctx.timeout().schedule( - () -> ctx.cache().context().exchange().dumpLongRunningOperations(longOpDumpTimeout), - longOpDumpTimeout, - longOpDumpTimeout - ); - } - else - longOpDumpTask = null; - } - } - - /** * @return Ignite security processor. See {@link IgniteSecurity} for details. */ private GridProcessor securityProcessor() throws IgniteCheckedException { @@ -2639,9 +2599,6 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { if (metricsLogTask != null) metricsLogTask.close(); - if (longOpDumpTask != null) - longOpDumpTask.close(); - if (longJVMPauseDetector != null) longJVMPauseDetector.stop(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/TransactionsMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/TransactionsMXBeanImpl.java index 5ca0a90..2e40c52 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/TransactionsMXBeanImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/TransactionsMXBeanImpl.java @@ -194,6 +194,16 @@ public class TransactionsMXBeanImpl implements TransactionsMXBean { } /** {@inheritDoc} */ + @Override public void setTxKeyCollisionsInterval(int timeout) { + ctx.cache().context().tm().collisionsDumpIntervalDistributed(timeout); + } + + /** {@inheritDoc} */ + @Override public int getTxKeyCollisionsInterval() { + return ctx.cache().context().tm().collisionsDumpInterval(); + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(TransactionsMXBeanImpl.class, this); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java index cbd0b57..9403e5c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheClusterMetricsMXBeanImpl.java @@ -495,6 +495,11 @@ class CacheClusterMetricsMXBeanImpl implements CacheMetricsMXBean { } /** {@inheritDoc} */ + @Override public String getTxKeyCollisions() { + return cache.clusterMetrics().getTxKeyCollisions(); + } + + /** {@inheritDoc} */ @Override public void enableStatistics() { try { cache.context().shared().cache().enableStatistics(Collections.singleton(cache.name()), true); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java index fe67968..89c7c04 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLocalMetricsMXBeanImpl.java @@ -511,4 +511,9 @@ class CacheLocalMetricsMXBeanImpl implements CacheMetricsMXBean { throw new RuntimeException(e.getMessage()); } } + + /** {@inheritDoc} */ + @Override public String getTxKeyCollisions() { + return cache.metrics0().getTxKeyCollisions(); + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java index 23a8629..c120993 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java @@ -35,8 +35,15 @@ import org.apache.ignite.internal.processors.metric.impl.MetricUtils; import org.apache.ignite.internal.util.collection.ImmutableIntSet; import org.apache.ignite.internal.util.collection.IntSet; import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS; @@ -203,6 +210,9 @@ public class CacheMetricsImpl implements CacheMetrics { /** Write-behind store, if configured. */ private GridCacheWriteBehindStore store; + /** Tx collisions info. */ + private volatile Supplier<List<Map.Entry</* Colliding keys. */ GridCacheMapEntry, /* Collisions queue size. */ Integer>>> txKeyCollisionInfo; + /** * Creates cache metrics. * @@ -355,6 +365,10 @@ public class CacheMetricsImpl implements CacheMetrics { commitTime = mreg.histogram("CommitTime", HISTOGRAM_BUCKETS, "Commit time in nanoseconds."); rollbackTime = mreg.histogram("RollbackTime", HISTOGRAM_BUCKETS, "Rollback time in nanoseconds."); + + mreg.register("TxKeyCollisions", this::getTxKeyCollisions, String.class, "Tx key collisions. " + + "Show keys and collisions queue size. Due transactional payload some keys become hot. Metric shows " + + "corresponding keys."); } /** @@ -668,6 +682,8 @@ public class CacheMetricsImpl implements CacheMetrics { if (delegate != null) delegate.clear(); + + txKeyCollisionInfo = null; } /** {@inheritDoc} */ @@ -852,6 +868,51 @@ public class CacheMetricsImpl implements CacheMetrics { delegate.onRead(isHit); } + /** Set callback for tx key collisions detection. + * + * @param coll Key collisions info holder. + */ + public void keyCollisionsInfo(Supplier<List<Map.Entry</* Colliding keys. */ GridCacheMapEntry, /* Collisions queue size. */ Integer>>> coll) { + txKeyCollisionInfo = coll; + + if (delegate != null) + delegate.keyCollisionsInfo(coll); + } + + /** Callback representing current key collisions state. + * + * @return Key collisions info holder. + */ + public @Nullable Supplier<List<Map.Entry<GridCacheMapEntry, Integer>>> keyCollisionsInfo() { + return txKeyCollisionInfo; + } + + /** {@inheritDoc} */ + @Override public String getTxKeyCollisions() { + SB sb = null; + + Supplier<List<Map.Entry<GridCacheMapEntry, Integer>>> collInfo = keyCollisionsInfo(); + + if (collInfo != null) { + List<Map.Entry<GridCacheMapEntry, Integer>> result = collInfo.get(); + + if (!F.isEmpty(result)) { + sb = new SB(); + + for (Map.Entry<GridCacheMapEntry, Integer> info : result) { + if (sb.length() > 0) + sb.a(U.nl()); + sb.a("key="); + sb.a(info.getKey().key()); + sb.a(", queueSize="); + sb.a(info.getValue()); + } + } + } + + return sb != null ? sb.toString() : ""; + } + /** * Cache invocations caused update callback. * diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java index f6357a8..e4809db 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshot.java @@ -1020,6 +1020,11 @@ public class CacheMetricsSnapshot implements CacheMetrics, Externalizable { return isValidForWriting; } + /** No need in snapshoting this metric, only local metric would be acceptable. */ + @Override public String getTxKeyCollisions() { + return ""; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(CacheMetricsSnapshot.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshotV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshotV2.java index c3a223d..d1dd6b8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshotV2.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsSnapshotV2.java @@ -24,6 +24,7 @@ import java.util.Collection; import org.apache.ignite.cache.CacheMetrics; import org.apache.ignite.internal.dto.IgniteDataTransferObject; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; /** * Metrics snapshot. @@ -319,6 +320,9 @@ public class CacheMetricsSnapshotV2 extends IgniteDataTransferObject implements */ private boolean isValidForWriting; + /** Tx key collisions with appropriate queue size string representation. */ + private String txKeyCollisions; + /** * Default constructor. */ @@ -428,6 +432,7 @@ public class CacheMetricsSnapshotV2 extends IgniteDataTransferObject implements rebalanceStartTime = m.rebalancingStartTime(); rebalanceFinishTime = m.estimateRebalancingFinishTime(); rebalanceClearingPartitionsLeft = m.getRebalanceClearingPartitionsLeft(); + txKeyCollisions = m.getTxKeyCollisions(); } /** @@ -1044,6 +1049,11 @@ public class CacheMetricsSnapshotV2 extends IgniteDataTransferObject implements } /** {@inheritDoc} */ + @Override public String getTxKeyCollisions() { + return txKeyCollisions; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(CacheMetricsSnapshotV2.class, this); } @@ -1124,6 +1134,7 @@ public class CacheMetricsSnapshotV2 extends IgniteDataTransferObject implements out.writeBoolean(isEmpty); out.writeInt(size); out.writeInt(keySize); + U.writeLongString(out, txKeyCollisions); } /** {@inheritDoc} */ @@ -1202,5 +1213,6 @@ public class CacheMetricsSnapshotV2 extends IgniteDataTransferObject implements isEmpty = in.readBoolean(); size = in.readInt(); keySize = in.readInt(); + txKeyCollisions = U.readLongString(in); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/LongOperationsDumpSettingsClosure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/LongOperationsDumpSettingsClosure.java index edd6ebc..ed1c418 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/LongOperationsDumpSettingsClosure.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/LongOperationsDumpSettingsClosure.java @@ -17,9 +17,7 @@ package org.apache.ignite.internal.processors.cache; -import org.apache.ignite.Ignite; import org.apache.ignite.internal.IgniteEx; -import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.resources.IgniteInstanceResource; @@ -37,7 +35,7 @@ public class LongOperationsDumpSettingsClosure implements IgniteRunnable { /** Auto-inject Ignite instance. */ @IgniteInstanceResource - private Ignite ignite; + private IgniteEx ignite; /** * Constructor. @@ -50,8 +48,6 @@ public class LongOperationsDumpSettingsClosure implements IgniteRunnable { /** {@inheritDoc} */ @Override public void run() { - ((IgniteEx)ignite).context().cache().context().tm().longOperationsDumpTimeout(longOpsDumpTimeout); - - ((IgniteKernal)ignite).scheduleLongOperationsDumpTask(longOpsDumpTimeout); + ignite.context().cache().context().tm().longOperationsDumpTimeout(longOpsDumpTimeout); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java index 3f0300f..7a8bab4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java @@ -312,6 +312,8 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry { CacheObject val; + cctx.tm().detectPossibleCollidingKeys(this); + lockEntry(); try { @@ -372,6 +374,8 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry { CacheObject val; + cctx.tm().detectPossibleCollidingKeys(this); + lockEntry(); try { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 189328d..3219aa1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -21,7 +21,9 @@ import java.io.Externalizable; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -29,15 +31,15 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; + import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteClientDisconnectedException; -import org.apache.ignite.IgniteCompute; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.binary.BinaryObjectException; -import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.TransactionConfiguration; import org.apache.ignite.events.DiscoveryEvent; @@ -57,6 +59,7 @@ import org.apache.ignite.internal.pagemem.wal.record.MvccTxRecord; import org.apache.ignite.internal.pagemem.wal.record.TxRecord; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObjectsReleaseFuture; +import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; @@ -73,6 +76,7 @@ import org.apache.ignite.internal.processors.cache.TxOwnerDumpRequestAllowedSett import org.apache.ignite.internal.processors.cache.TxTimeoutOnPartitionMapExchangeChangeMessage; import org.apache.ignite.internal.processors.cache.distributed.GridCacheMappedVersion; import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryFuture; +import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocal; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxOnePhaseCommitAckRequest; @@ -90,6 +94,7 @@ import org.apache.ignite.internal.processors.cache.transactions.TxDeadlockDetect import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cluster.BaselineTopology; import org.apache.ignite.internal.processors.metric.impl.HitRateMetric; +import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; @@ -109,7 +114,6 @@ import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteReducer; -import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.spi.systemview.view.TransactionView; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; @@ -118,8 +122,10 @@ import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentLinkedHashMap; +import static java.util.Objects.nonNull; import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_BUFFER_SIZE; import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEFERRED_ONE_PHASE_COMMIT_ACK_REQUEST_TIMEOUT; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_DUMP_TX_COLLISIONS_INTERVAL; import static org.apache.ignite.IgniteSystemProperties.IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT; import static org.apache.ignite.IgniteSystemProperties.IGNITE_LONG_TRANSACTION_TIME_DUMP_THRESHOLD; import static org.apache.ignite.IgniteSystemProperties.IGNITE_MAX_COMPLETED_TX_COUNT; @@ -136,9 +142,10 @@ import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; import static org.apache.ignite.events.EventType.EVT_TX_STARTED; import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE_COORDINATOR; import static org.apache.ignite.internal.GridTopic.TOPIC_TX; -import static org.apache.ignite.internal.IgniteFeatures.LONG_OPERATIONS_DUMP_TIMEOUT; +import static org.apache.ignite.internal.IgniteFeatures.DISTRIBUTED_CHANGE_LONG_OPERATIONS_DUMP_TIMEOUT; import static org.apache.ignite.internal.IgniteFeatures.LRT_SYSTEM_USER_TIME_DUMP_SETTINGS; import static org.apache.ignite.internal.IgniteFeatures.TRANSACTION_OWNER_THREAD_DUMP_PROVIDING; +import static org.apache.ignite.internal.IgniteFeatures.DISTRIBUTED_TX_COLLISIONS_DUMP; import static org.apache.ignite.internal.IgniteKernal.DFLT_LONG_OPERATIONS_DUMP_TIMEOUT; import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.READ; @@ -147,6 +154,7 @@ import static org.apache.ignite.internal.processors.cache.transactions.IgniteInt import static org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx.FinalizationStatus.USER_FINISH; import static org.apache.ignite.internal.util.GridConcurrentFactory.newMap; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.internal.util.IgniteUtils.broadcastToNodesSupportingFeature; import static org.apache.ignite.transactions.TransactionState.ACTIVE; import static org.apache.ignite.transactions.TransactionState.COMMITTED; import static org.apache.ignite.transactions.TransactionState.COMMITTING; @@ -187,6 +195,13 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { static int DEADLOCK_MAX_ITERS = IgniteSystemProperties.getInteger(IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS, 1000); + /** Collisions dump interval. */ + private volatile int collisionsDumpInterval = + IgniteSystemProperties.getInteger(IGNITE_DUMP_TX_COLLISIONS_INTERVAL, 1000); + + /** Lower tx collisions queue size threshold. */ + private static final int COLLISIONS_QUEUE_THRESHOLD = 100; + /** Committing transactions. */ private final ThreadLocal<IgniteInternalTx> threadCtx = new ThreadLocal<>(); @@ -291,6 +306,16 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { private ConcurrentMap<UUID, TxTimeoutOnPartitionMapExchangeChangeFuture> txTimeoutOnPartitionMapExchangeFuts = new ConcurrentHashMap<>(); + /** Timeout operations. */ + private final Map<String, GridTimeoutProcessor.CancelableTask> timeoutOperations = + new HashMap<String, GridTimeoutProcessor.CancelableTask>() {{ + put(IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT, null); + put(IGNITE_DUMP_TX_COLLISIONS_INTERVAL, null); + }}; + + /** Key collisions info holder. */ + private volatile KeyCollisionsHolder keyCollisionsInfo; + /** {@inheritDoc} */ @Override protected void onKernalStop0(boolean cancel) { cctx.gridIO().removeMessageListener(TOPIC_TX); @@ -373,6 +398,12 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { new TransactionViewWalker(), new ReadOnlyCollectionView2X<>(idMap.values(), nearIdMap.values()), TransactionView::new); + + keyCollisionsInfo = new KeyCollisionsHolder(); + + longOperationsDumpTimeout(longOperationsDumpTimeout()); + + txCollisionsDumpInterval(collisionsDumpInterval()); } /** {@inheritDoc} */ @@ -2120,6 +2151,55 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { */ public void longOperationsDumpTimeout(long longOpsDumpTimeout) { this.longOpsDumpTimeout = longOpsDumpTimeout; + + scheduleDumpTask( + IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT, + () -> cctx.kernalContext().cache().context().exchange().dumpLongRunningOperations(longOpsDumpTimeout), + longOpsDumpTimeout); + } + + /** + * Schedule tx collisions collection task. + * + * @param timeout Sets tx key collisions analysis interval. + **/ + void txCollisionsDumpInterval(int timeout) { + collisionsDumpInterval = timeout; + + scheduleDumpTask( + IGNITE_DUMP_TX_COLLISIONS_INTERVAL, + this::collectTxCollisionsInfo, + collisionsDumpInterval()); + } + + /** + * Scheduling tasks for dumping long operations. Closes current task + * (if any) and if the {@code timeout > 0} schedules a new task + * with a new timeout, delay and start period equal to + * {@code timeout}, otherwise task is deleted. + * + * @param taskKey Appropriate key in {@link IgniteTxManager#timeoutOperations} + * @param r Task. + * @param timeout Long operations dump timeout. + */ + void scheduleDumpTask(String taskKey, Runnable r, long timeout) { + if (isStopping()) + return; + + GridTimeoutProcessor.CancelableTask longOpDumpTask; + + GridTimeoutProcessor timeoutProc = cctx.kernalContext().timeout(); + + synchronized (timeoutOperations) { + GridTimeoutProcessor.CancelableTask task = timeoutOperations.get(taskKey); + + if (nonNull(task)) + task.close(); + + longOpDumpTask = timeout > 0 ? timeoutProc.schedule(r, timeout, timeout) : null; + + timeoutOperations.put(taskKey, longOpDumpTask); + } } /** @@ -2803,20 +2883,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } /** - * Setting (for all nodes) a timeout (in millis) for printing long-running - * transactions as well as transactions that cannot receive locks for all - * their keys for a long time. Set less than or equal {@code 0} to disable. - * - * @param longOpsDumpTimeout Long operations dump timeout. - */ - public void longOperationsDumpTimeoutDistributed(long longOpsDumpTimeout) { - broadcastToNodesSupportingFeature( - new LongOperationsDumpSettingsClosure(longOpsDumpTimeout), - LONG_OPERATIONS_DUMP_TIMEOUT - ); - } - - /** * Sets transaction timeout on partition map exchange. * * @param timeout Transaction timeout on partition map exchange in milliseconds. @@ -2894,14 +2960,12 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * @param allowed whether allowed */ public void setTxOwnerDumpRequestsAllowedDistributed(boolean allowed) { - ClusterGroup grp = cctx.kernalContext().grid() - .cluster() - .forServers() - .forPredicate(node -> IgniteFeatures.nodeSupports(node, TRANSACTION_OWNER_THREAD_DUMP_PROVIDING)); - - IgniteCompute compute = cctx.kernalContext().grid().compute(grp); - - compute.broadcast(new TxOwnerDumpRequestAllowedSettingClosure(allowed)); + broadcastToNodesSupportingFeature( + cctx.kernalContext(), + new TxOwnerDumpRequestAllowedSettingClosure(allowed), + true, + TRANSACTION_OWNER_THREAD_DUMP_PROVIDING + ); } /** @@ -2917,7 +2981,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { assert threshold >= 0 : "Threshold timeout must be greater than or equal to 0."; broadcastToNodesSupportingFeature( + cctx.kernalContext(), new LongRunningTxTimeDumpSettingsClosure(threshold, null, null), + false, LRT_SYSTEM_USER_TIME_DUMP_SETTINGS ); } @@ -2932,7 +2998,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { assert coefficient >= 0.0 && coefficient <= 1.0 : "Percentage value must be between 0.0 and 1.0 inclusively."; broadcastToNodesSupportingFeature( + cctx.kernalContext(), new LongRunningTxTimeDumpSettingsClosure(null, coefficient, null), + false, LRT_SYSTEM_USER_TIME_DUMP_SETTINGS ); } @@ -2948,25 +3016,178 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { assert limit > 0 : "Limit value must be greater than 0."; broadcastToNodesSupportingFeature( + cctx.kernalContext(), new LongRunningTxTimeDumpSettingsClosure(null, null, limit), + false, LRT_SYSTEM_USER_TIME_DUMP_SETTINGS ); } + /** {@inheritDoc} */ + @Override protected void stop0(boolean cancel) { + super.stop0(cancel); + + synchronized (timeoutOperations) { + timeoutOperations.forEach((k, v) -> { + if (v != null) + v.close(); + }); + } + } + /** - * Broadcasts given job to nodes that support ignite feature. + * Setting (for all nodes) a timeout (in millis) for printing long-running + * transactions as well as transactions that cannot receive locks for all + * their keys for a long time. Set less than or equal {@code 0} to disable. * - * @param job Ignite job. - * @param feature Ignite feature. + * @param longOpsDumpTimeout Long operations dump timeout. */ - private void broadcastToNodesSupportingFeature(IgniteRunnable job, IgniteFeatures feature) { - ClusterGroup grp = cctx.kernalContext().grid() - .cluster() - .forPredicate(node -> IgniteFeatures.nodeSupports(node, feature)); + public void longOperationsDumpTimeoutDistributed(long longOpsDumpTimeout) { + broadcastToNodesSupportingFeature( + cctx.kernalContext(), + new LongOperationsDumpSettingsClosure(longOpsDumpTimeout), + false, + DISTRIBUTED_CHANGE_LONG_OPERATIONS_DUMP_TIMEOUT + ); + } - IgniteCompute compute = cctx.kernalContext().grid().compute(grp); + /** + * Returns tx keys collisions dump interval, for additional info check + * {@link IgniteSystemProperties#IGNITE_DUMP_TX_COLLISIONS_INTERVAL} description. + * + * @return Collisions dump interval. + */ + public int collisionsDumpInterval() { + return collisionsDumpInterval; + } + + /** + * Changes tx key collisions dump interval. + * For additional info check {@link IgniteSystemProperties#IGNITE_DUMP_TX_COLLISIONS_INTERVAL} description. + * + * @param collisionsDumpInterval New collisions dump interval or negative for disabling. + */ + public void collisionsDumpIntervalDistributed(int collisionsDumpInterval) { + broadcastToNodesSupportingFeature( + cctx.kernalContext(), + new TxCollisionsDumpSettingsClosure(collisionsDumpInterval), + true, + DISTRIBUTED_TX_COLLISIONS_DUMP + ); + } + + /** + * Collect queue size per key collisions info. + * + * @param key Key. + * @param queueSize Collisions queue size + */ + public void pushCollidingKeysWithQueueSize(GridCacheMapEntry key, int queueSize) { + keyCollisionsInfo.put(key, queueSize); + } + + /** Wrapper for inner collect logic. */ + private void collectTxCollisionsInfo() { + keyCollisionsInfo.collectInfo(); + } + + /** + * Check local and remote candidates queue size. + * + * @param entry CacheEntry. + */ + public void detectPossibleCollidingKeys(GridDistributedCacheEntry entry) { + int qSize = entry.remoteMvccSnapshot().size(); + + try { + qSize += entry.localCandidates().size(); + } + catch (GridCacheEntryRemovedException ignored) { + // No-op, obsolete vers found. + } + + if (qSize >= COLLISIONS_QUEUE_THRESHOLD) + pushCollidingKeysWithQueueSize(entry, qSize); + } + + /** Tx key collisions info holder. */ + private final class KeyCollisionsHolder { + /** Stripes count. */ + private final int stripesCnt = cctx.kernalContext().config().getSystemThreadPoolSize(); + + /** Max objects per store. */ + private static final int MAX_OBJS = 5; + + /** Store for keys and collisions queue sizes. */ + private final Map<GridCacheMapEntry, Integer> stores[] = new LinkedHashMap[stripesCnt]; - compute.broadcast(job); + /** Metric per cache store. */ + private final Map<GridCacheAdapter<?, ?>, List<Map.Entry<GridCacheMapEntry, Integer>>> metricPerCacheStore = + new ConcurrentHashMap<>(); + + /** Guard. */ + private final AtomicBoolean alreadyRun = new AtomicBoolean(); + + /** Constructor. */ + private KeyCollisionsHolder() { + for (int i = 0; i < stripesCnt; ++i) { + stores[i] = new LinkedHashMap<GridCacheMapEntry, Integer>() { + /** {@inheritDoc} */ + @Override protected boolean removeEldestEntry(Map.Entry<GridCacheMapEntry, Integer> eldest) { + return size() > MAX_OBJS; + } + }; + } + } + + /** + * Stores keys and values. + * + * @param key Key to store. + * @param val Value to store. + */ + public void put(GridCacheMapEntry key, Integer val) { + int stripeIdx = key.hashCode() & (stripesCnt - 1); + + synchronized (stores[stripeIdx]) { + stores[stripeIdx].put(key, val); + } + } + + /** Print hot keys info. */ + private void collectInfo() { + if (!alreadyRun.compareAndSet(false, true)) + return; + + metricPerCacheStore.clear(); + + for (int i = 0; i < stripesCnt; ++i) { + synchronized (stores[i]) { + Map<GridCacheMapEntry, Integer> store = stores[i]; + + if (store.isEmpty()) + continue; + + for (Map.Entry<GridCacheMapEntry, Integer> info : store.entrySet()) { + GridCacheAdapter<Object, Object> cacheCtx = info.getKey().context().cache(); + + metricPerCacheStore.computeIfAbsent(cacheCtx, k -> new ArrayList<>()).add(info); + } + + store.clear(); + } + } + + metricPerCacheStore.forEach((k, v) -> { + if (k.metrics0().keyCollisionsInfo() == null) { + k.metrics0().keyCollisionsInfo( + () -> metricPerCacheStore.get(k) + ); + } + }); + + alreadyRun.getAndSet(false); + } } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/LongOperationsDumpSettingsClosure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCollisionsDumpSettingsClosure.java similarity index 57% copy from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/LongOperationsDumpSettingsClosure.java copy to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCollisionsDumpSettingsClosure.java index edd6ebc..9c72ca8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/LongOperationsDumpSettingsClosure.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxCollisionsDumpSettingsClosure.java @@ -15,43 +15,40 @@ * limitations under the License. */ -package org.apache.ignite.internal.processors.cache; +package org.apache.ignite.internal.processors.cache.transactions; -import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.internal.IgniteEx; -import org.apache.ignite.internal.IgniteKernal; import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.resources.IgniteInstanceResource; /** - * Closure that is sent to the node in order to change - * "Long operations dump timeout" parameter and also reschedule the task for - * dumping long operations. + * Change tx collisions interval or negative for disabling. */ -public class LongOperationsDumpSettingsClosure implements IgniteRunnable { +public class TxCollisionsDumpSettingsClosure implements IgniteRunnable { /** Serialization ID. */ private static final long serialVersionUID = 0L; - /** Long operations dump timeout. */ - private final long longOpsDumpTimeout; - /** Auto-inject Ignite instance. */ @IgniteInstanceResource - private Ignite ignite; + private IgniteEx ignite; /** - * Constructor. + * Tx key collision dump interval. + * Check {@link IgniteSystemProperties#IGNITE_DUMP_TX_COLLISIONS_INTERVAL} for additional info. + */ + private final int interval; + + /** Constructor. * - * @param longOpsDumpTimeout Long operations dump timeout. + * @param timeout New interval for key collisions collection. */ - public LongOperationsDumpSettingsClosure(long longOpsDumpTimeout) { - this.longOpsDumpTimeout = longOpsDumpTimeout; + TxCollisionsDumpSettingsClosure(int timeout) { + interval = timeout; } /** {@inheritDoc} */ @Override public void run() { - ((IgniteEx)ignite).context().cache().context().tm().longOperationsDumpTimeout(longOpsDumpTimeout); - - ((IgniteKernal)ignite).scheduleLongOperationsDumpTask(longOpsDumpTimeout); + ignite.context().cache().context().tm().txCollisionsDumpInterval(interval); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index 8d608f1..222e0a4 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -171,6 +171,7 @@ import javax.net.ssl.X509TrustManager; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteClientDisconnectedException; +import org.apache.ignite.IgniteCompute; import org.apache.ignite.IgniteDeploymentException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteIllegalStateException; @@ -179,6 +180,7 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.binary.BinaryRawReader; import org.apache.ignite.binary.BinaryRawWriter; +import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.cluster.ClusterGroupEmptyException; import org.apache.ignite.cluster.ClusterMetrics; import org.apache.ignite.cluster.ClusterNode; @@ -195,6 +197,7 @@ import org.apache.ignite.events.EventType; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; import org.apache.ignite.internal.IgniteDeploymentCheckedException; +import org.apache.ignite.internal.IgniteFeatures; import org.apache.ignite.internal.IgniteFutureCancelledCheckedException; import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; @@ -248,6 +251,7 @@ import org.apache.ignite.lang.IgniteFutureTimeoutException; import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteProductVersion; +import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.lifecycle.LifecycleAware; import org.apache.ignite.marshaller.Marshaller; @@ -11888,4 +11892,30 @@ public abstract class IgniteUtils { public static int utfBytes(char c) { return (c >= 0x0001 && c <= 0x007F) ? 1 : (c > 0x07FF) ? 3 : 2; } + + /** + * Broadcasts given job to nodes that support ignite feature. + * + * @param kctx Kernal context. + * @param job Ignite job. + * @param srvrsOnly Broadcast only on server nodes. + * @param feature Ignite feature. + */ + public static void broadcastToNodesSupportingFeature( + GridKernalContext kctx, + IgniteRunnable job, + boolean srvrsOnly, + IgniteFeatures feature + ) { + ClusterGroup cl = kctx.grid().cluster(); + + if (srvrsOnly) + cl = cl.forServers(); + + ClusterGroup grp = cl.forPredicate(node -> IgniteFeatures.nodeSupports(node, feature)); + + IgniteCompute compute = kctx.grid().compute(grp); + + compute.broadcast(job); + } } diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/CacheMetricsMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/CacheMetricsMXBean.java index fc3cacb..e6fce4a 100644 --- a/modules/core/src/main/java/org/apache/ignite/mxbean/CacheMetricsMXBean.java +++ b/modules/core/src/main/java/org/apache/ignite/mxbean/CacheMetricsMXBean.java @@ -316,16 +316,21 @@ public interface CacheMetricsMXBean extends CacheStatisticsMXBean, CacheMXBean, public boolean isReadThrough(); /** {@inheritDoc} */ - @Override @MXBeanDescription("True when a cache is in write-through mode.") - public boolean isWriteThrough(); + @MXBeanDescription("True when a cache is in write-through mode.") + @Override public boolean isWriteThrough(); /** {@inheritDoc} */ - @Override @MXBeanDescription("True when a cache topology is valid for read operations.") - public boolean isValidForReading(); + @MXBeanDescription("True when a cache topology is valid for read operations.") + @Override public boolean isValidForReading(); /** {@inheritDoc} */ - @Override @MXBeanDescription("True when a cache topology is valid for write operations.") - public boolean isValidForWriting(); + @MXBeanDescription("True when a cache topology is valid for write operations.") + @Override public boolean isValidForWriting(); + + /** {@inheritDoc} */ + @MXBeanDescription("Tx key collisions. Show key and appropriate collisions queue size for the last " + + "IGNITE_DUMP_TX_COLLISIONS_INTERVAL.") + @Override public String getTxKeyCollisions(); /** * Enable statistic collection for the cache. diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/TransactionsMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/TransactionsMXBean.java index fca905c..e6f322a 100644 --- a/modules/core/src/main/java/org/apache/ignite/mxbean/TransactionsMXBean.java +++ b/modules/core/src/main/java/org/apache/ignite/mxbean/TransactionsMXBean.java @@ -245,4 +245,24 @@ public interface TransactionsMXBean { "receive locks for all their keys for a long time. Returns {@code 0} or less if not set." ) long getLongOperationsDumpTimeout(); + + /** + * Set timeout interval for tx key contention analysis. + * @param timeout Interval in millis. + */ + @MXBeanParametersNames("timeout") + @MXBeanDescription("Timeout interval (in millis) for printing tx key contention queue size info. Each transaction " + + "besides OPTIMISTIC SERIALIZABLE capture locks on all enlisted keys, for some reasons per key lock queue may " + + "rise. This property sets the interval during which keys and appropriate queue size statistics has been " + + "collected.") + void setTxKeyCollisionsInterval(int timeout); + + /** + * @return Current interval in millis. + */ + @MXBeanDescription("Returns a timeout (in millis) for printing tx key contention queue size info. Each transaction " + + "besides OPTIMISTIC SERIALIZABLE capture locks on all enlisted keys, for some reasons per key lock queue may " + + "rise. Returns the interval during which keys and appropriate queue size statistics has been " + + "collected.") + int getTxKeyCollisionsInterval(); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsManageTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsManageTest.java index 271768a..c2c297c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsManageTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheMetricsManageTest.java @@ -17,37 +17,64 @@ package org.apache.ignite.internal.processors.cache; +import java.lang.management.ManagementFactory; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; import javax.cache.CacheManager; import javax.cache.Caching; +import javax.management.MBeanServer; +import javax.management.MBeanServerInvocationHandler; +import javax.management.ObjectName; + import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteTransactions; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMetrics; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.TransactionsMXBeanImpl; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; +import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.mxbean.CacheMetricsMXBean; +import org.apache.ignite.mxbean.TransactionsMXBean; +import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.MvccFeatureChecker; +import org.apache.ignite.testframework.junits.WithSystemProperty; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; import org.junit.Assume; import org.junit.Test; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_DUMP_TX_COLLISIONS_INTERVAL; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; + /** * */ @@ -68,6 +95,15 @@ public class CacheMetricsManageTest extends GridCommonAbstractTest { /** Persistence. */ private boolean persistence; + /** Use test spi flag. */ + private boolean useTestCommSpi; + + /** Backups count. */ + private int backups = -1; + + /** Client flag. */ + private boolean client; + /** * @throws Exception If failed. */ @@ -478,19 +514,35 @@ public class CacheMetricsManageTest extends GridCommonAbstractTest { return getMxBean(getTestIgniteInstanceName(nodeIdx), cacheName, clazz.getName(), CacheMetricsMXBean.class); } - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); - - CacheConfiguration cacheCfg = new CacheConfiguration() + /** Default cache config. */ + private CacheConfiguration<?, ?> getCacheConfiguration() { + CacheConfiguration<?, ?> cacheCfg = new CacheConfiguration<>() .setName(CACHE1) .setGroupName(GROUP) .setCacheMode(CacheMode.PARTITIONED) .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + if (backups != -1) + cacheCfg.setBackups(2); + + return cacheCfg; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + CacheConfiguration<?, ?> cacheCfg = getCacheConfiguration(); + cfg.setCacheConfiguration(cacheCfg); + if (useTestCommSpi) + cfg.setCommunicationSpi(new TestRecordingCommunicationSpi()); + + if (client) + cfg.setClientMode(client); + if (persistence) cfg.setDataStorageConfiguration(new DataStorageConfiguration() .setDefaultDataRegionConfiguration( @@ -504,6 +556,286 @@ public class CacheMetricsManageTest extends GridCommonAbstractTest { return cfg; } + /** Test correct metric for tx key contention. */ + @Test + @WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = "30000") + public void testTxContentionMetric() throws Exception { + Assume.assumeFalse("https://issues.apache.org/jira/browse/IGNITE-9224", MvccFeatureChecker.forcedMvcc()); + + backups = 1; + + useTestCommSpi = true; + + Ignite ig = startGridsMultiThreaded(2); + + int contCnt = (int)U.staticField(IgniteTxManager.class, "COLLISIONS_QUEUE_THRESHOLD") * 20; + + CountDownLatch txLatch = new CountDownLatch(contCnt * 2); + + CountDownLatch txLatch0 = new CountDownLatch(contCnt * 2); + + ig.cluster().active(true); + + client = true; + + Ignite cl = startGrid(); + + CacheConfiguration<?, ?> dfltCacheCfg = getCacheConfiguration(); + + dfltCacheCfg.setStatisticsEnabled(true); + + String cacheName = dfltCacheCfg.getName(); + + IgniteCache<Integer, Integer> cache = ig.cache(cacheName); + + IgniteCache<Integer, Integer> cache0 = cl.cache(cacheName); + + CacheMetricsMXBean mxBeanCache = mxBean(0, cacheName, CacheLocalMetricsMXBeanImpl.class); + + final List<Integer> priKeys = primaryKeys(cache, 3, 1); + + final Integer backKey = backupKey(cache); + + IgniteTransactions txMgr = cl.transactions(); + + CountDownLatch blockOnce = new CountDownLatch(1); + + for (Ignite ig0 : G.allGrids()) { + TestRecordingCommunicationSpi commSpi0 = + (TestRecordingCommunicationSpi)ig0.configuration().getCommunicationSpi(); + + commSpi0.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { + @Override public boolean apply(ClusterNode node, Message msg) { + if (msg instanceof GridNearTxPrepareResponse && blockOnce.getCount() > 0) { + blockOnce.countDown(); + + return true; + } + + return false; + } + }); + } + + IgniteInternalFuture f = GridTestUtils.runAsync(() -> { + try (Transaction tx = txMgr.txStart(PESSIMISTIC, READ_COMMITTED)) { + cache0.put(priKeys.get(0), 0); + cache0.put(priKeys.get(2), 0); + tx.commit(); + } + }); + + blockOnce.await(); + + GridCompoundFuture<?, ?> finishFut = new GridCompoundFuture<>(); + + for (int i = 0; i < contCnt; ++i) { + IgniteInternalFuture f0 = GridTestUtils.runAsync(() -> { + try (Transaction tx = txMgr.txStart(PESSIMISTIC, READ_COMMITTED)) { + cache0.put(priKeys.get(0), 0); + cache0.put(priKeys.get(1), 0); + + txLatch0.countDown(); + + tx.commit(); + + txLatch.countDown(); + } + + try (Transaction tx = txMgr.txStart(PESSIMISTIC, READ_COMMITTED)) { + cache0.put(priKeys.get(2), 0); + cache0.put(backKey, 0); + + txLatch0.countDown(); + + tx.commit(); + + txLatch.countDown(); + } + }); + + finishFut.add(f0); + } + + finishFut.markInitialized(); + + txLatch0.await(); + + for (Ignite ig0 : G.allGrids()) { + TestRecordingCommunicationSpi commSpi0 = + (TestRecordingCommunicationSpi)ig0.configuration().getCommunicationSpi(); + + commSpi0.stopBlock(); + } + + IgniteTxManager txManager = ((IgniteEx) ig).context().cache().context().tm(); + + assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + try { + U.invoke(IgniteTxManager.class, txManager, "collectTxCollisionsInfo"); + } + catch (IgniteCheckedException e) { + fail(e.toString()); + } + + String coll = mxBeanCache.getTxKeyCollisions(); + + if (coll.contains("val=" + priKeys.get(2)) || coll.contains("val=" + priKeys.get(0))); + return true; + } + }, 10_000)); + + f.get(); + + finishFut.get(); + + txLatch.await(); + } + + /** Tests metric change interval. */ + @Test + public void testKeyCollisionsMetricDifferentTimeout() throws Exception { + Assume.assumeFalse("https://issues.apache.org/jira/browse/IGNITE-9224", MvccFeatureChecker.forcedMvcc()); + + backups = 2; + + useTestCommSpi = true; + + Ignite ig = startGridsMultiThreaded(3); + + int contCnt = (int)U.staticField(IgniteTxManager.class, "COLLISIONS_QUEUE_THRESHOLD") * 5; + + CountDownLatch txLatch = new CountDownLatch(contCnt); + + ig.cluster().active(true); + + client = true; + + Ignite cl = startGrid(); + + IgniteTransactions txMgr = cl.transactions(); + + CacheConfiguration<?, ?> dfltCacheCfg = getCacheConfiguration(); + + dfltCacheCfg.setStatisticsEnabled(true); + + String cacheName = dfltCacheCfg.getName(); + + IgniteCache<Integer, Integer> cache = ig.cache(cacheName); + + IgniteCache<Integer, Integer> cache0 = cl.cache(cacheName); + + final Integer keyId = primaryKey(cache); + + CountDownLatch blockOnce = new CountDownLatch(1); + + for (Ignite ig0 : G.allGrids()) { + if (ig0.configuration().isClientMode()) + continue; + + TestRecordingCommunicationSpi commSpi0 = + (TestRecordingCommunicationSpi)ig0.configuration().getCommunicationSpi(); + + commSpi0.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { + @Override public boolean apply(ClusterNode node, Message msg) { + if (msg instanceof GridNearTxFinishResponse && blockOnce.getCount() > 0) { + blockOnce.countDown(); + + return true; + } + + return false; + } + }); + } + + IgniteInternalFuture f = GridTestUtils.runAsync(() -> { + try (Transaction tx = txMgr.txStart(PESSIMISTIC, READ_COMMITTED)) { + cache0.put(keyId, 0); + tx.commit(); + } + }); + + blockOnce.await(); + + GridCompoundFuture<?, ?> finishFut = new GridCompoundFuture<>(); + + for (int i = 0; i < contCnt; ++i) { + IgniteInternalFuture f0 = GridTestUtils.runAsync(() -> { + try (Transaction tx = txMgr.txStart(PESSIMISTIC, READ_COMMITTED)) { + cache0.put(keyId, 0); + + tx.commit(); + + txLatch.countDown(); + } + }); + + finishFut.add(f0); + } + + finishFut.markInitialized(); + + for (Ignite ig0 : G.allGrids()) { + TestRecordingCommunicationSpi commSpi0 = + (TestRecordingCommunicationSpi)ig0.configuration().getCommunicationSpi(); + + if (ig0.configuration().isClientMode()) + continue; + + commSpi0.stopBlock(); + } + + CacheMetricsMXBean mxBeanCache = mxBean(0, cacheName, CacheLocalMetricsMXBeanImpl.class); + + IgniteTxManager txManager = ((IgniteEx) ig).context().cache().context().tm(); + + final TransactionsMXBean txMXBean1 = txMXBean(0); + + final TransactionsMXBean txMXBean2 = txMXBean(1); + + for (int i = 0; i < 10; ++i) { + txMXBean1.setTxKeyCollisionsInterval(ThreadLocalRandom.current().nextInt(1000, 1100)); + + txMXBean2.setTxKeyCollisionsInterval(ThreadLocalRandom.current().nextInt(1000, 1100)); + + mxBeanCache.getTxKeyCollisions(); + + mxBeanCache.clear(); + + try { + U.invoke(IgniteTxManager.class, txManager, "collectTxCollisionsInfo"); + } + catch (IgniteCheckedException e) { + fail(e.toString()); + } + + U.sleep(500); + } + + f.get(); + + finishFut.get(); + + txLatch.await(); + } + + /** + * + */ + private TransactionsMXBean txMXBean(int igniteInt) throws Exception { + ObjectName mbeanName = U.makeMBeanName(getTestIgniteInstanceName(igniteInt), "Transactions", + TransactionsMXBeanImpl.class.getSimpleName()); + + MBeanServer mbeanSrv = ManagementFactory.getPlatformMBeanServer(); + + if (!mbeanSrv.isRegistered(mbeanName)) + fail("MBean is not registered: " + mbeanName.getCanonicalName()); + + return MBeanServerInvocationHandler.newProxyInstance(mbeanSrv, mbeanName, TransactionsMXBean.class, true); + } + /** * Check cache statistics enabled/disabled flag for all nodes * diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxWithKeyContentionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxWithKeyContentionSelfTest.java new file mode 100644 index 0000000..deb4795 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxWithKeyContentionSelfTest.java @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.transactions; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteTransactions; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMetrics; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse; +import org.apache.ignite.internal.util.future.GridCompoundFuture; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.MvccFeatureChecker; +import org.apache.ignite.testframework.junits.WithSystemProperty; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; +import org.junit.Test; +import java.util.concurrent.CountDownLatch; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_DUMP_TX_COLLISIONS_INTERVAL; +import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** Tests tx key contention detection functional. */ +public class TxWithKeyContentionSelfTest extends GridCommonAbstractTest { + /** Client flag. */ + private boolean client; + + /** Near cache flag. */ + private boolean nearCache; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String name) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(name); + + cfg.setConsistentId("NODE_" + name.substring(name.length() - 1)); + + if (client) + cfg.setClientMode(true); + + cfg.setDataStorageConfiguration( + new DataStorageConfiguration() + .setDefaultDataRegionConfiguration( + new DataRegionConfiguration() + .setMaxSize(20 * 1024 * 1024) + ) + ); + + TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi(); + + cfg.setCommunicationSpi(commSpi); + + cfg.setCacheConfiguration(getCacheConfiguration(DEFAULT_CACHE_NAME)); + + if (client) { + cfg.setConsistentId("Client"); + + cfg.setClientMode(client); + } + + return cfg; + } + + /** */ + protected CacheConfiguration<?, ?> getCacheConfiguration(String name) { + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(name) + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) + .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC) + .setAffinity(new RendezvousAffinityFunction(false, 16)) + .setBackups(2) + .setStatisticsEnabled(true); + + if (nearCache) + ccfg.setNearConfiguration(new NearCacheConfiguration<>()); + + return ccfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * Tests transactional payload. + * + * @throws Exception If failed. + */ + @Test + @WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = "30000") + public void testPessimisticRepeatableReadCheckContentionTxMetric() throws Exception { + runKeyCollisionsMetric(PESSIMISTIC, REPEATABLE_READ); + } + + /** + * Tests transactional payload with near cache enabled. + * + * @throws Exception If failed. + */ + @Test + @WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = "30000") + public void testPessimisticRepeatableReadCheckContentionTxMetricNear() throws Exception { + nearCache = true; + + runKeyCollisionsMetric(PESSIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + @Test + @WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = "30000") + public void testPessimisticReadCommitedCheckContentionTxMetric() throws Exception { + runKeyCollisionsMetric(PESSIMISTIC, READ_COMMITTED); + } + + /** + * @throws Exception If failed. + */ + @Test + @WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = "30000") + public void testPessimisticReadCommitedCheckContentionTxMetricNear() throws Exception { + nearCache = true; + + runKeyCollisionsMetric(PESSIMISTIC, READ_COMMITTED); + } + + /** + * @throws Exception If failed. + */ + @Test + @WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = "30000") + public void testOptimisticReadCommittedCheckContentionTxMetric() throws Exception { + runKeyCollisionsMetric(OPTIMISTIC, READ_COMMITTED); + } + + /** + * @throws Exception If failed. + */ + @Test + @WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = "30000") + public void testOptimisticReadCommittedCheckContentionTxMetricNear() throws Exception { + nearCache = true; + + runKeyCollisionsMetric(OPTIMISTIC, READ_COMMITTED); + } + + /** + * @throws Exception If failed. + */ + @Test + @WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = "30000") + public void testOptimisticRepeatableReadCheckContentionTxMetric() throws Exception { + runKeyCollisionsMetric(OPTIMISTIC, REPEATABLE_READ); + } + + /** + * @throws Exception If failed. + */ + @Test + @WithSystemProperty(key = IGNITE_DUMP_TX_COLLISIONS_INTERVAL, value = "30000") + public void testOptimisticRepeatableReadCheckContentionTxMetricNear() throws Exception { + nearCache = true; + + runKeyCollisionsMetric(OPTIMISTIC, REPEATABLE_READ); + } + + /** Tests metric correct results while tx collisions occured. + * + * @param concurrency Concurrency level. + * @param isolation Isolation level. + * @throws Exception If failed. + */ + private void runKeyCollisionsMetric(TransactionConcurrency concurrency, TransactionIsolation isolation) throws Exception { + if (MvccFeatureChecker.forcedMvcc()) + return; // Not supported. + + Ignite ig = startGridsMultiThreaded(3); + + int contCnt = (int)U.staticField(IgniteTxManager.class, "COLLISIONS_QUEUE_THRESHOLD") * 5; + + CountDownLatch txLatch = new CountDownLatch(contCnt); + + ig.cluster().active(true); + + client = true; + + Ignite cl = startGrid(); + + IgniteTransactions txMgr = cl.transactions(); + + IgniteCache<Integer, Integer> cache = ig.cache(DEFAULT_CACHE_NAME); + + IgniteCache<Integer, Integer> cache0 = cl.cache(DEFAULT_CACHE_NAME); + + final Integer keyId = primaryKey(cache); + + CountDownLatch blockOnce = new CountDownLatch(1); + + for (Ignite ig0 : G.allGrids()) { + if (ig0.configuration().isClientMode()) + continue; + + TestRecordingCommunicationSpi commSpi0 = + (TestRecordingCommunicationSpi)ig0.configuration().getCommunicationSpi(); + + commSpi0.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { + @Override public boolean apply(ClusterNode node, Message msg) { + if (msg instanceof GridNearTxFinishResponse && blockOnce.getCount() > 0) { + blockOnce.countDown(); + + return true; + } + + return false; + } + }); + } + + IgniteInternalFuture f = GridTestUtils.runAsync(() -> { + try (Transaction tx = txMgr.txStart(concurrency, isolation)) { + cache0.put(keyId, 0); + tx.commit(); + } + }); + + blockOnce.await(); + + GridCompoundFuture<?, ?> finishFut = new GridCompoundFuture<>(); + + for (int i = 0; i < contCnt; ++i) { + IgniteInternalFuture f0 = GridTestUtils.runAsync(() -> { + try (Transaction tx = txMgr.txStart(concurrency, isolation)) { + cache0.put(keyId, 0); + + tx.commit(); + + txLatch.countDown(); + } + }); + + finishFut.add(f0); + } + + finishFut.markInitialized(); + + for (Ignite ig0 : G.allGrids()) { + TestRecordingCommunicationSpi commSpi0 = + (TestRecordingCommunicationSpi)ig0.configuration().getCommunicationSpi(); + + if (ig0.configuration().isClientMode()) + continue; + + commSpi0.stopBlock(); + } + + IgniteTxManager txManager = ((IgniteEx) ig).context().cache().context().tm(); + + assertTrue(GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + try { + U.invoke(IgniteTxManager.class, txManager, "collectTxCollisionsInfo"); + } + catch (IgniteCheckedException e) { + fail(e.toString()); + } + + CacheMetrics metrics = ig.cache(DEFAULT_CACHE_NAME).localMetrics(); + + String coll1 = metrics.getTxKeyCollisions(); + + if (!coll1.isEmpty()) { + String coll2 = metrics.getTxKeyCollisions(); + + // check idempotent + assertEquals(coll1, coll2); + + assertTrue(coll1.contains("queueSize")); + + return true; + } + else + return false; + } + }, 10_000)); + + f.get(); + + finishFut.get(); + + txLatch.await(); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxWithSmallTimeoutAndContentionOneKeyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxWithSmallTimeoutAndContentionOneKeyTest.java index b1e75e3..5394f80 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxWithSmallTimeoutAndContentionOneKeyTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxWithSmallTimeoutAndContentionOneKeyTest.java @@ -94,6 +94,15 @@ public class TxWithSmallTimeoutAndContentionOneKeyTest extends GridCommonAbstrac cleanPersistenceDir(); } + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + + cleanPersistenceDir(); + } + /** * @return Random transaction type. */ diff --git a/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheWriteMetricsTask.java b/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheWriteMetricsTask.java index faca537..5eb14b6 100644 --- a/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheWriteMetricsTask.java +++ b/modules/core/src/test/java/org/apache/ignite/platform/PlatformCacheWriteMetricsTask.java @@ -415,6 +415,11 @@ public class PlatformCacheWriteMetricsTask extends ComputeTaskAdapter<Long, Obje } /** {@inheritDoc} */ + @Override public String getTxKeyCollisions() { + return ""; + } + + /** {@inheritDoc} */ @Override public int getTotalPartitionsCount() { return 54; } diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java index 1824be5..0b71420 100755 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java @@ -57,6 +57,7 @@ import org.apache.ignite.internal.processors.cache.transactions.TxCrossCacheMapO import org.apache.ignite.internal.processors.cache.transactions.TxCrossCacheRemoteMultiplePartitionReservationTest; import org.apache.ignite.internal.processors.cache.transactions.TxRecoveryWithConcurrentRollbackTest; import org.apache.ignite.internal.processors.cache.transactions.TxRollbackAsyncWithPersistenceTest; +import org.apache.ignite.internal.processors.cache.transactions.TxWithKeyContentionSelfTest; import org.apache.ignite.internal.processors.cache.transactions.TxWithSmallTimeoutAndContentionOneKeyTest; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.DynamicSuite; @@ -137,6 +138,8 @@ public class IgniteCacheTestSuite7 { GridTestUtils.addTestIfNeeded(suite, AtomicPartitionCounterStateConsistencyTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, AtomicPartitionCounterStateConsistencyHistoryRebalanceTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, TxWithKeyContentionSelfTest.class, ignoredTests); + return suite; } }