ignite-2854 Deadlock detection for pessimistic transactions
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/8f7e6504 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/8f7e6504 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/8f7e6504 Branch: refs/heads/ignite-3163 Commit: 8f7e6504f97ad8f4966dad53005a17ef2c50069d Parents: f805917 Author: agura <[email protected]> Authored: Thu May 5 18:16:31 2016 +0300 Committer: agura <[email protected]> Committed: Thu May 5 18:16:31 2016 +0300 ---------------------------------------------------------------------- .../java/org/apache/ignite/IgniteCache.java | 197 ++++-- .../apache/ignite/IgniteSystemProperties.java | 11 + .../org/apache/ignite/internal/GridTopic.java | 5 +- .../communication/GridIoMessageFactory.java | 28 +- .../processors/cache/GridCacheMapEntry.java | 16 + .../processors/cache/GridCacheMvcc.java | 7 + .../processors/cache/GridCacheMvccManager.java | 20 +- .../cache/GridCacheSharedContext.java | 2 +- .../distributed/dht/GridDhtLockFuture.java | 22 +- .../distributed/dht/GridDhtTxLocalAdapter.java | 10 +- .../colocated/GridDhtColocatedLockFuture.java | 71 ++- .../distributed/near/GridNearLockFuture.java | 3 +- .../cache/distributed/near/GridNearTxLocal.java | 7 +- .../near/GridNearTxPrepareFutureAdapter.java | 1 + .../cache/local/GridLocalLockFuture.java | 55 +- .../cache/transactions/IgniteTxAdapter.java | 17 +- .../transactions/IgniteTxLocalAdapter.java | 44 +- .../cache/transactions/IgniteTxManager.java | 413 ++++++++++++- .../cache/transactions/TxDeadlock.java | 159 +++++ .../cache/transactions/TxDeadlockDetection.java | 599 +++++++++++++++++++ .../processors/cache/transactions/TxLock.java | 225 +++++++ .../cache/transactions/TxLockList.java | 134 +++++ .../cache/transactions/TxLocksRequest.java | 205 +++++++ .../cache/transactions/TxLocksResponse.java | 318 ++++++++++ .../ignite/internal/util/IgniteUtils.java | 4 + .../apache/ignite/transactions/Transaction.java | 19 +- .../TransactionDeadlockException.java | 42 ++ .../TransactionTimeoutException.java | 5 +- .../transactions/DepthFirstSearchTest.java | 252 ++++++++ .../transactions/TxDeadlockDetectionTest.java | 495 +++++++++++++++ ...simisticDeadlockDetectionCrossCacheTest.java | 165 +++++ .../TxPessimisticDeadlockDetectionTest.java | 487 +++++++++++++++ ...naryObjectsTxDeadlockDetectionTestSuite.java | 37 ++ .../TxDeadlockDetectionTestSuite.java | 44 ++ 34 files changed, 4012 insertions(+), 107 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/IgniteCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java index a791e38..3af2c44 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteCache.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteCache.java @@ -58,6 +58,9 @@ import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.mxbean.CacheMetricsMXBean; +import org.apache.ignite.transactions.TransactionHeuristicException; +import org.apache.ignite.transactions.TransactionRollbackException; +import org.apache.ignite.transactions.TransactionTimeoutException; import org.jetbrains.annotations.Nullable; /** @@ -233,6 +236,9 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS * previous value). * @throws NullPointerException If either key or value are {@code null}. * @throws CacheException If put operation failed. + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. */ @IgniteAsyncSupported public V getAndPutIfAbsent(K key, V val) throws CacheException; @@ -390,12 +396,20 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS * if any, defined by the {@link EntryProcessor} implementation. No mappings * will be returned for {@link EntryProcessor}s that return a * <code>null</code> value for a key. + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. */ @IgniteAsyncSupported public <T> Map<K, EntryProcessorResult<T>> invokeAll(Map<? extends K, ? extends EntryProcessor<K, V, T>> map, Object... args); - /** {@inheritDoc} */ + /** + * {@inheritDoc} + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. + */ @IgniteAsyncSupported @Override public V get(K key); @@ -406,19 +420,27 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS * because the entry is missing from the cache, the Cache's {@link CacheLoader} * is called in an attempt to load the entry. * - * @param key the key whose associated value is to be returned - * @return the element, or null, if it does not exist. - * @throws IllegalStateException if the cache is {@link #isClosed()} - * @throws NullPointerException if the key is null - * @throws CacheException if there is a problem fetching the value - * @throws ClassCastException if the implementation is configured to perform + * @param key The key whose associated value is to be returned. + * @return The element, or null, if it does not exist. + * @throws IllegalStateException If the cache is {@link #isClosed()}. + * @throws NullPointerException If the key is {@code null}. + * @throws CacheException If there is a problem fetching the value. + * @throws ClassCastException If the implementation is configured to perform * runtime-type-checking, and the key or value types are incompatible with those that have been - * configured for the {@link Cache} + * configured for the {@link Cache}. + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. */ @IgniteAsyncSupported public CacheEntry<K, V> getEntry(K key); - /** {@inheritDoc} */ + /** + * {@inheritDoc} + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. + */ @IgniteAsyncSupported @Override public Map<K, V> getAll(Set<? extends K> keys); @@ -434,12 +456,15 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS * @param keys The keys whose associated values are to be returned. * @return A collection of entries that were found for the given keys. Entries not found * in the cache are not in the returned collection. - * @throws NullPointerException if keys is null or if keys contains a null - * @throws IllegalStateException if the cache is {@link #isClosed()} - * @throws CacheException if there is a problem fetching the values - * @throws ClassCastException if the implementation is configured to perform + * @throws NullPointerException If keys is null or if keys contains a {@code null}. + * @throws IllegalStateException If the cache is {@link #isClosed()}. + * @throws CacheException If there is a problem fetching the values. + * @throws ClassCastException If the implementation is configured to perform * runtime-type-checking, and the key or value types are incompatible with those that have been - * configured for the {@link Cache} + * configured for the {@link Cache}. + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. */ @IgniteAsyncSupported public Collection<CacheEntry<K, V>> getEntries(Set<? extends K> keys); @@ -454,7 +479,12 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS @IgniteAsyncSupported public Map<K, V> getAllOutTx(Set<? extends K> keys); - /** {@inheritDoc} */ + /** + * {@inheritDoc} + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. + */ @IgniteAsyncSupported @Override public boolean containsKey(K key); @@ -463,51 +493,109 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS * * @param keys Key whose presence in this cache is to be tested. * @return {@code True} if this cache contains a mapping for the specified keys. + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. */ @IgniteAsyncSupported public boolean containsKeys(Set<? extends K> keys); - /** {@inheritDoc} */ + /** + * {@inheritDoc} + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. + */ @IgniteAsyncSupported @Override public void put(K key, V val); - /** {@inheritDoc} */ + /** + * {@inheritDoc} + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. + */ @IgniteAsyncSupported @Override public V getAndPut(K key, V val); - /** {@inheritDoc} */ + /** + * {@inheritDoc} + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. + */ @IgniteAsyncSupported @Override public void putAll(Map<? extends K, ? extends V> map); - /** {@inheritDoc} */ + /** + * {@inheritDoc} + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. + */ @IgniteAsyncSupported @Override public boolean putIfAbsent(K key, V val); - /** {@inheritDoc} */ + /** + * {@inheritDoc} + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. + */ @IgniteAsyncSupported @Override public boolean remove(K key); - /** {@inheritDoc} */ + /** + * {@inheritDoc} + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. + */ @IgniteAsyncSupported @Override public boolean remove(K key, V oldVal); - /** {@inheritDoc} */ + /** + * {@inheritDoc} + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. + */ @IgniteAsyncSupported @Override public V getAndRemove(K key); - /** {@inheritDoc} */ + /** + * {@inheritDoc} + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. + */ @IgniteAsyncSupported @Override public boolean replace(K key, V oldVal, V newVal); - /** {@inheritDoc} */ + /** + * {@inheritDoc} + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. + */ @IgniteAsyncSupported @Override public boolean replace(K key, V val); - /** {@inheritDoc} */ + /** + * {@inheritDoc} + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. + */ @IgniteAsyncSupported @Override public V getAndReplace(K key, V val); - /** {@inheritDoc} */ + /** + * {@inheritDoc} + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. + */ @IgniteAsyncSupported @Override public void removeAll(Set<? extends K> keys); @@ -591,7 +679,12 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS */ public void localClearAll(Set<? extends K> keys); - /** {@inheritDoc} */ + /** + * {@inheritDoc} + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. + */ @IgniteAsyncSupported @Override public <T> T invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... arguments); @@ -605,28 +698,34 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS * backup nodes in the cache. It is guaranteed that the value passed to the entry processor will be always * the same. * - * @param key the key to the entry - * @param entryProcessor the {@link CacheEntryProcessor} to invoke - * @param arguments additional arguments to pass to the - * {@link CacheEntryProcessor} - * @return the result of the processing, if any, defined by the - * {@link CacheEntryProcessor} implementation - * @throws NullPointerException if key or {@link CacheEntryProcessor} is null - * @throws IllegalStateException if the cache is {@link #isClosed()} - * @throws ClassCastException if the implementation is configured to perform + * @param key The key to the entry. + * @param entryProcessor The {@link CacheEntryProcessor} to invoke. + * @param arguments Additional arguments to pass to the {@link CacheEntryProcessor}. + * @return The result of the processing, if any, defined by the {@link CacheEntryProcessor} implementation. + * @throws NullPointerException If key or {@link CacheEntryProcessor} is null + * @throws IllegalStateException If the cache is {@link #isClosed()} + * @throws ClassCastException If the implementation is configured to perform * runtime-type-checking, and the key or value * types are incompatible with those that have been - * configured for the {@link Cache} - * @throws EntryProcessorException if an exception is thrown by the {@link + * configured for the {@link Cache}. + * @throws EntryProcessorException If an exception is thrown by the {@link * CacheEntryProcessor}, a Caching Implementation * must wrap any {@link Exception} thrown * wrapped in an {@link EntryProcessorException}. + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. * @see CacheEntryProcessor */ @IgniteAsyncSupported public <T> T invoke(K key, CacheEntryProcessor<K, V, T> entryProcessor, Object... arguments); - /** {@inheritDoc} */ + /** + * {@inheritDoc} + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. + */ @IgniteAsyncSupported @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, EntryProcessor<K, V, T> entryProcessor, Object... args); @@ -655,20 +754,22 @@ public interface IgniteCache<K, V> extends javax.cache.Cache<K, V>, IgniteAsyncS * backup nodes in the cache. It is guaranteed that the value passed to the entry processor will be always * the same. * - * @param keys the set of keys for entries to process - * @param entryProcessor the {@link CacheEntryProcessor} to invoke - * @param args additional arguments to pass to the - * {@link CacheEntryProcessor} - * @return the map of {@link EntryProcessorResult}s of the processing per key, + * @param keys The set of keys for entries to process. + * @param entryProcessor The {@link CacheEntryProcessor} to invoke. + * @param args Additional arguments to pass to the {@link CacheEntryProcessor}. + * @return The map of {@link EntryProcessorResult}s of the processing per key, * if any, defined by the {@link CacheEntryProcessor} implementation. No mappings * will be returned for {@link CacheEntryProcessor}s that return a * <code>null</code> value for a key. - * @throws NullPointerException if keys or {@link CacheEntryProcessor} are null - * @throws IllegalStateException if the cache is {@link #isClosed()} - * @throws ClassCastException if the implementation is configured to perform + * @throws NullPointerException If keys or {@link CacheEntryProcessor} are {#code null}. + * @throws IllegalStateException If the cache is {@link #isClosed()}. + * @throws ClassCastException If the implementation is configured to perform * runtime-type-checking, and the key or value * types are incompatible with those that have been - * configured for the {@link Cache} + * configured for the {@link Cache}. + * @throws TransactionTimeoutException If operation performs within transaction and timeout occurred. + * @throws TransactionRollbackException If operation performs within transaction that automatically rolled back. + * @throws TransactionHeuristicException If operation performs within transaction that entered an unknown state. * @see CacheEntryProcessor */ @IgniteAsyncSupported http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 095d199..2b643b2 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -188,6 +188,17 @@ public final class IgniteSystemProperties { public static final String IGNITE_TX_SALVAGE_TIMEOUT = "IGNITE_TX_SALVAGE_TIMEOUT"; /** + * Specifies maximum number of iterations for deadlock detection procedure. + * If value of this property is less then or equal to zero then deadlock detection will be disabled. + */ + public static final String IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS = "IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS"; + + /** + * Specifies timeout for deadlock detection procedure. + */ + public static final String IGNITE_TX_DEADLOCK_DETECTION_TIMEOUT = "IGNITE_TX_DEADLOCK_DETECTION_TIMEOUT"; + + /** * System property to override multicast group taken from configuration. * Used for testing purposes. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java index fbf2b18..248f75b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java @@ -91,7 +91,10 @@ public enum GridTopic { TOPIC_HADOOP, /** */ - TOPIC_QUERY; + TOPIC_QUERY, + + /** */ + TOPIC_TX; /** Enum values. */ private static final GridTopic[] VALS = values(); http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 3c7f378..5f60215 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -26,13 +26,13 @@ import org.apache.ignite.internal.GridJobSiblingsRequest; import org.apache.ignite.internal.GridJobSiblingsResponse; import org.apache.ignite.internal.GridTaskCancelRequest; import org.apache.ignite.internal.GridTaskSessionRequest; +import org.apache.ignite.internal.binary.BinaryEnumObjectImpl; +import org.apache.ignite.internal.binary.BinaryObjectImpl; import org.apache.ignite.internal.managers.checkpoint.GridCheckpointRequest; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; import org.apache.ignite.internal.managers.deployment.GridDeploymentRequest; import org.apache.ignite.internal.managers.deployment.GridDeploymentResponse; import org.apache.ignite.internal.managers.eventstorage.GridEventStorageMessage; -import org.apache.ignite.internal.binary.BinaryEnumObjectImpl; -import org.apache.ignite.internal.binary.BinaryObjectImpl; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection; import org.apache.ignite.internal.processors.cache.CacheEntryPredicateContainsValue; @@ -99,6 +99,10 @@ import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinu import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.transactions.TxEntryValueHolder; +import org.apache.ignite.internal.processors.cache.transactions.TxLock; +import org.apache.ignite.internal.processors.cache.transactions.TxLockList; +import org.apache.ignite.internal.processors.cache.transactions.TxLocksRequest; +import org.apache.ignite.internal.processors.cache.transactions.TxLocksResponse; import org.apache.ignite.internal.processors.cache.version.GridCacheRawVersionedEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx; @@ -156,6 +160,26 @@ public class GridIoMessageFactory implements MessageFactory { Message msg = null; switch (type) { + case -26: + msg = new TxLockList(); + + break; + + case -25: + msg = new TxLock(); + + break; + + case -24: + msg = new TxLocksRequest(); + + break; + + case -23: + msg = new TxLocksResponse(); + + break; + case TcpCommunicationSpi.NODE_ID_MSG_TYPE: msg = new TcpCommunicationSpi.NodeIdMessage(); http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 45be26c..0f7482a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -18,8 +18,10 @@ package org.apache.ignite.internal.processors.cache; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; @@ -4324,6 +4326,20 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } /** + * @return All MVCC local candidates. + */ + @Nullable public synchronized List<GridCacheMvccCandidate> mvccAllLocal() { + GridCacheMvcc mvcc = extras != null ? extras.mvcc() : null; + + if (mvcc == null) + return null; + + List<GridCacheMvccCandidate> locs = mvcc.allLocal(); + + return (locs == null || locs.isEmpty()) ? null : new ArrayList<>(locs); + } + + /** * @param mvcc MVCC. */ protected void mvccExtras(@Nullable GridCacheMvcc mvcc) { http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java index adcbf92..507a2c9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java @@ -1357,6 +1357,13 @@ public final class GridCacheMvcc { } /** + * @return Local MVCC candidates. + */ + @Nullable public List<GridCacheMvccCandidate> allLocal() { + return locs; + } + + /** * @param ver Version to check for ownership. * @return {@code True} if lock is owned by the specified version. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index fb05ca5..43609eb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -30,7 +30,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; -import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException; @@ -664,6 +663,25 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { } /** + * Gets futures for given lock ID. + * + * @param ver Lock ID. + * @return Futures. + */ + @SuppressWarnings({"unchecked"}) + @Nullable public Collection<GridCacheMvccFuture<?>> mvccFutures(GridCacheVersion ver) { + Collection<GridCacheMvccFuture<?>> futs = this.mvccFuts.get(ver); + + if (futs != null) { + synchronized (futs) { + return new ArrayList<>(futs); + } + } + + return null; + } + + /** * @param futId Future ID. * @return Found future. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index 2221d3b..ef271f1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -467,7 +467,7 @@ public class GridCacheSharedContext<K, V> { * @param nodeId Node ID. * @return Node or {@code null}. */ - public ClusterNode node(UUID nodeId) { + @Nullable public ClusterNode node(UUID nodeId) { return kernalCtx.discovery().node(nodeId); } http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index 80f35c1..5659436 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -132,7 +132,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> private LockTimeoutObject timeoutObj; /** Lock timeout. */ - private long timeout; + private final long timeout; /** Filter. */ private CacheEntryPredicate[] filter; @@ -199,6 +199,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> assert nearNodeId != null; assert nearLockVer != null; assert topVer.topologyVersion() > 0; + assert (tx != null && timeout >= 0) || tx == null; this.cctx = cctx; this.nearNodeId = nearNodeId; @@ -482,7 +483,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> private void onFailed(boolean dist) { undoLocks(dist); - onComplete(false, false); + onComplete(false, false, true); } /** @@ -628,7 +629,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> err = t; } - onComplete(false, false); + onComplete(false, false, true); } /** @@ -691,7 +692,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> /** {@inheritDoc} */ @Override public boolean cancel() { if (onCancelled()) - onComplete(false, false); + onComplete(false, false, true); return isCancelled(); } @@ -721,7 +722,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> this.err = err; } - return onComplete(success, err instanceof NodeStoppingException); + return onComplete(success, err instanceof NodeStoppingException, true); } /** @@ -729,13 +730,14 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> * * @param success {@code True} if lock was acquired. * @param stopping {@code True} if node is stopping. + * @param unlock {@code True} if locks should be released. * @return {@code True} if complete by this operation. */ - private boolean onComplete(boolean success, boolean stopping) { + private boolean onComplete(boolean success, boolean stopping, boolean unlock) { if (log.isDebugEnabled()) log.debug("Received onComplete(..) callback [success=" + success + ", fut=" + this + ']'); - if (!success && !stopping) + if (!success && !stopping && unlock) undoLocks(true); boolean set = false; @@ -784,7 +786,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> */ public void map() { if (F.isEmpty(entries)) { - onComplete(true, false); + onComplete(true, false, true); return; } @@ -1088,7 +1090,9 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> timedOut = true; - onComplete(false, false); + boolean releaseLocks = !(inTx() && cctx.tm().deadlockDetectionEnabled()); + + onComplete(false, false, releaseLocks); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 534a560..cdbb6bf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -690,10 +690,16 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { if (passedKeys.isEmpty()) return new GridFinishedFuture<>(ret); - GridDhtTransactionalCacheAdapter<?, ?> dhtCache = cacheCtx.isNear() ? cacheCtx.nearTx().dht() : cacheCtx.dhtTx(); + GridDhtTransactionalCacheAdapter<?, ?> dhtCache = + cacheCtx.isNear() ? cacheCtx.nearTx().dht() : cacheCtx.dhtTx(); + + long timeout = remainingTime(); + + if (timeout == -1) + return new GridFinishedFuture<>(timeoutException()); IgniteInternalFuture<Boolean> fut = dhtCache.lockAllAsyncInternal(passedKeys, - lockTimeout(), + timeout, this, isInvalidate(), read, http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index a5f5286..dc55eb5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -21,8 +21,10 @@ import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; import java.util.Deque; +import java.util.HashSet; import java.util.Iterator; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; @@ -50,8 +52,10 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLock import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; +import org.apache.ignite.internal.processors.cache.transactions.TxDeadlock; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; +import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture; import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -65,7 +69,9 @@ import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.transactions.TransactionDeadlockException; import org.apache.ignite.transactions.TransactionIsolation; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; @@ -451,6 +457,28 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture } /** + * @return Keys for which locks requested from remote nodes but response isn't received. + */ + public Set<KeyCacheObject> requestedKeys() { + Set<KeyCacheObject> requestedKeys = null; + + for (IgniteInternalFuture<Boolean> miniFut : futures()) { + if (isMini(miniFut) && !miniFut.isDone()) { + if (requestedKeys == null) + requestedKeys = new HashSet<>(); + + MiniFuture mini = (MiniFuture)miniFut; + + requestedKeys.addAll(mini.keys); + + return requestedKeys; + } + } + + return requestedKeys; + } + + /** * Finds pending mini future by the given mini ID. * * @param miniId Mini ID to find. @@ -502,6 +530,10 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture if (log.isDebugEnabled()) log.debug("Received onDone(..) callback [success=" + success + ", err=" + err + ", fut=" + this + ']'); + // Local GridDhtLockFuture + if (inTx() && this.err instanceof IgniteTxTimeoutCheckedException && cctx.tm().deadlockDetectionEnabled()) + return false; + if (isDone()) return false; @@ -1288,7 +1320,36 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture if (log.isDebugEnabled()) log.debug("Timed out waiting for lock response: " + this); - onComplete(false, true); + if (inTx() && cctx.tm().deadlockDetectionEnabled()) { + Set<IgniteTxKey> keys = new HashSet<>(); + + for (IgniteTxEntry txEntry : tx.allEntries()) { + if (!txEntry.locked()) + keys.add(txEntry.txKey()); + } + + IgniteInternalFuture<TxDeadlock> fut = cctx.tm().detectDeadlock(tx, keys); + + fut.listen(new IgniteInClosure<IgniteInternalFuture<TxDeadlock>>() { + @Override public void apply(IgniteInternalFuture<TxDeadlock> fut) { + try { + TxDeadlock deadlock = fut.get(); + + if (deadlock != null) + err = new TransactionDeadlockException(deadlock.toString(cctx.shared())); + } + catch (IgniteCheckedException e) { + err = e; + + U.warn(log, "Failed to detect deadlock.", e); + } + + onComplete(false, true); + } + }); + } + else + onComplete(false, true); } /** {@inheritDoc} */ @@ -1310,11 +1371,11 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture /** Node ID. */ @GridToStringExclude - private ClusterNode node; + private final ClusterNode node; /** Keys. */ @GridToStringInclude - private Collection<KeyCacheObject> keys; + private final Collection<KeyCacheObject> keys; /** Mappings to proceed. */ @GridToStringExclude @@ -1394,6 +1455,10 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture } if (res.error() != null) { + if (inTx() && res.error() instanceof IgniteTxTimeoutCheckedException && + cctx.tm().deadlockDetectionEnabled()) + return; + if (log.isDebugEnabled()) log.debug("Finishing mini future with an error due to error in response [miniFut=" + this + ", res=" + res + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index 55c5ab6..d5c0133 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -118,7 +118,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean private LockTimeoutObject timeoutObj; /** Lock timeout. */ - private long timeout; + private final long timeout; /** Filter. */ private final CacheEntryPredicate[] filter; @@ -180,6 +180,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean super(cctx.kernalContext(), CU.boolReducer()); assert keys != null; + assert (tx != null && timeout >= 0) || tx == null; this.cctx = cctx; this.keys = keys; http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 515d284..3ff165b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -1171,8 +1171,13 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { if (log.isDebugEnabled()) log.debug("Before acquiring transaction lock on keys: " + keys); + long timeout = remainingTime(); + + if (timeout == -1) + return new GridFinishedFuture<>(timeoutException()); + IgniteInternalFuture<Boolean> fut = cacheCtx.colocated().lockAllAsyncInternal(keys, - lockTimeout(), + timeout, this, isInvalidate(), read, http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java index 52cad91..6992aa5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareFutureAdapter.java @@ -73,6 +73,7 @@ public abstract class GridNearTxPrepareFutureAdapter extends protected GridCacheSharedContext<?, ?> cctx; /** Future ID. */ + @GridToStringInclude protected IgniteUuid futId; /** Transaction. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java index 9f53c18..7868c79 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalLockFuture.java @@ -19,11 +19,14 @@ package org.apache.ignite.internal.processors.cache.local; import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; @@ -31,14 +34,18 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; +import org.apache.ignite.internal.processors.cache.transactions.TxDeadlock; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; +import org.apache.ignite.transactions.TransactionDeadlockException; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; @@ -86,7 +93,7 @@ public final class GridLocalLockFuture<K, V> extends GridFutureAdapter<Boolean> private LockTimeoutObject timeoutObj; /** Lock timeout. */ - private long timeout; + private final long timeout; /** Filter. */ private CacheEntryPredicate[] filter; @@ -114,6 +121,7 @@ public final class GridLocalLockFuture<K, V> extends GridFutureAdapter<Boolean> CacheEntryPredicate[] filter) { assert keys != null; assert cache != null; + assert (tx != null && timeout >= 0) || tx == null; this.cctx = cctx; this.cache = cache; @@ -435,12 +443,53 @@ public final class GridLocalLockFuture<K, V> extends GridFutureAdapter<Boolean> } /** {@inheritDoc} */ - @SuppressWarnings({"ThrowableInstanceNeverThrown"}) + @SuppressWarnings({"ThrowableInstanceNeverThrown", "ForLoopReplaceableByForEach"}) @Override public void onTimeout() { if (log.isDebugEnabled()) log.debug("Timed out waiting for lock response: " + this); - onComplete(false); + if (inTx() && cctx.tm().deadlockDetectionEnabled()) { + Set<IgniteTxKey> keys = new HashSet<>(); + + List<GridLocalCacheEntry> entries = entries(); + + for (int i = 0; i < entries.size(); i++) { + GridLocalCacheEntry e = entries.get(i); + + List<GridCacheMvccCandidate> mvcc = e.mvccAllLocal(); + + if (mvcc == null) + continue; + + GridCacheMvccCandidate cand = mvcc.get(0); + + if (cand.owner() && cand.tx() && !cand.version().equals(tx.xidVersion())) + keys.add(e.txKey()); + } + + IgniteInternalFuture<TxDeadlock> fut = cctx.tm().detectDeadlock(tx, keys); + + fut.listen(new IgniteInClosure<IgniteInternalFuture<TxDeadlock>>() { + @Override public void apply(IgniteInternalFuture<TxDeadlock> fut) { + try { + TxDeadlock deadlock = fut.get(); + + if (deadlock != null) + err.compareAndSet(null, + new TransactionDeadlockException(deadlock.toString(cctx.shared()))); + } + catch (IgniteCheckedException e) { + err.compareAndSet(null, e); + + U.warn(log, "Failed to detect deadlock.", e); + } + + onComplete(false); + } + }); + } + else + onComplete(false); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 34f1fa4..3286689 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -697,27 +697,24 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter /** * Gets remaining allowed transaction time. * - * @return Remaining transaction time. + * @return Remaining transaction time. {@code 0} if timeout isn't specified. {@code -1} if time is out. */ @Override public long remainingTime() { if (timeout() <= 0) - return -1; + return 0; long timeLeft = timeout() - (U.currentTimeMillis() - startTime()); - if (timeLeft < 0) - return 0; + return timeLeft <= 0 ? -1 : timeLeft; - return timeLeft; } /** - * @return Lock timeout. + * @return Transaction timeout exception. */ - protected long lockTimeout() { - long timeout = remainingTime(); - - return timeout < 0 ? 0 : timeout == 0 ? -1 : timeout; + protected final IgniteCheckedException timeoutException() { + return new IgniteTxTimeoutCheckedException("Failed to acquire lock within provided timeout " + + "for transaction [timeout=" + timeout() + ", tx=" + this + ']'); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index dc1ec43..962f2d5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -67,6 +67,7 @@ import org.apache.ignite.internal.processors.dr.GridDrType; import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; +import org.apache.ignite.transactions.TransactionDeadlockException; import org.apache.ignite.internal.util.GridLeanMap; import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; @@ -1833,8 +1834,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter long accessTtl = expiryPlc != null ? CU.toTtl(expiryPlc.getExpiryForAccess()) : CU.TTL_NOT_CHANGED; + long timeout = remainingTime(); + + if (timeout == -1) + return new GridFinishedFuture<>(timeoutException()); + IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(lockKeys, - lockTimeout(), + timeout, this, true, true, @@ -3095,8 +3101,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (log.isDebugEnabled()) log.debug("Before acquiring transaction lock for put on key: " + enlisted); + long timeout = remainingTime(); + + if (timeout == -1) + return new GridFinishedFuture<>(timeoutException()); + IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted, - lockTimeout(), + timeout, this, false, retval, @@ -3270,8 +3281,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (log.isDebugEnabled()) log.debug("Before acquiring transaction lock for put on keys: " + enlisted); + long timeout = remainingTime(); + + if (timeout == -1) + return new GridFinishedFuture<>(timeoutException()); + IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted, - lockTimeout(), + timeout, this, false, retval, @@ -3552,8 +3568,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (log.isDebugEnabled()) log.debug("Before acquiring transaction lock for remove on keys: " + enlisted); + long timeout = remainingTime(); + + if (timeout == -1) + return new GridFinishedFuture<>(timeoutException()); + IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted, - lockTimeout(), + timeout, this, false, retval, @@ -3707,7 +3728,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter throw new IgniteCheckedException("Cache transaction marked as rollback-only: " + this); } - if (remainingTime() == 0 && setRollbackOnly()) + if (remainingTime() == -1 && setRollbackOnly()) throw new IgniteTxTimeoutCheckedException("Cache transaction timed out " + "(was rolled back automatically): " + this); } @@ -4070,7 +4091,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter /** {@inheritDoc} */ @Override public final IgniteInternalFuture<T> apply(Boolean locked, @Nullable final Exception e) { - if (e != null) { + TransactionDeadlockException deadlockErr = X.cause(e, TransactionDeadlockException.class); + + if (e != null && deadlockErr == null) { setRollbackOnly(); if (commit && commitAfterLock()) @@ -4083,12 +4106,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter throw new GridClosureException(e); } - if (!locked) { + if (deadlockErr != null || !locked) { setRollbackOnly(); - final GridClosureException ex = new GridClosureException(new IgniteTxTimeoutCheckedException("Failed to " + - "acquire lock within provided timeout for transaction [timeout=" + timeout() + - ", tx=" + this + ']')); + final GridClosureException ex = new GridClosureException( + new IgniteTxTimeoutCheckedException("Failed to acquire lock within provided timeout " + + "for transaction [timeout=" + timeout() + ", tx=" + this + ']', deadlockErr) + ); if (commit && commitAfterLock()) return rollbackAsync().chain(new C1<IgniteInternalFuture<IgniteInternalTx>, T>() { http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 5dcd53d..6e8f9fa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -32,17 +33,25 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.binary.BinaryObjectException; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.Event; import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.managers.communication.GridIoPolicy; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; +import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; +import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; +import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture; import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.GridCacheMappedVersion; import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxFinishSync; import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryFuture; @@ -51,9 +60,11 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTx import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocal; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxRemote; +import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedLockFuture; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; +import org.apache.ignite.internal.processors.cache.transactions.TxDeadlockDetection.TxDeadlockFuture; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException; @@ -68,6 +79,7 @@ import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteReducer; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.transactions.TransactionConcurrency; @@ -79,9 +91,12 @@ import org.jsr166.ConcurrentLinkedHashMap; import static org.apache.ignite.IgniteSystemProperties.IGNITE_MAX_COMPLETED_TX_COUNT; import static org.apache.ignite.IgniteSystemProperties.IGNITE_SLOW_TX_WARN_TIMEOUT; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS; import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_SALVAGE_TIMEOUT; import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; +import static org.apache.ignite.internal.GridTopic.TOPIC_TX; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled; import static org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx.FinalizationStatus.RECOVERY_FINISH; import static org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx.FinalizationStatus.USER_FINISH; @@ -111,6 +126,13 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { /** Tx salvage timeout (default 3s). */ private static final int TX_SALVAGE_TIMEOUT = Integer.getInteger(IGNITE_TX_SALVAGE_TIMEOUT, 100); + /** Version in which deadlock detection introduced. */ + public static final IgniteProductVersion TX_DEADLOCK_DETECTION_SINCE = IgniteProductVersion.fromString("1.5.19"); + + /** Deadlock detection maximum iterations. */ + static final int DEADLOCK_MAX_ITERS = + IgniteSystemProperties.getInteger(IGNITE_TX_DEADLOCK_DETECTION_MAX_ITERS, 1000); + /** Committing transactions. */ private final ThreadLocal<IgniteInternalTx> threadCtx = new ThreadLocal<>(); @@ -129,6 +151,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { /** Per-ID map for near transactions. */ private final ConcurrentMap<GridCacheVersion, IgniteInternalTx> nearIdMap = newMap(); + /** Deadlock detection futures. */ + private final ConcurrentMap<Long, TxDeadlockFuture> deadlockDetectFuts = new ConcurrentHashMap8<>(); + /** TX handler. */ private IgniteTxHandler txHnd; @@ -162,6 +187,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { private final ConcurrentMap<GridCacheVersion, GridCacheVersion> mappedVers = new ConcurrentHashMap8<>(5120); + /** TxDeadlock detection. */ + private TxDeadlockDetection txDeadlockDetection; + /** {@inheritDoc} */ @Override protected void onKernalStart0(boolean reconnect) { if (reconnect) @@ -175,14 +203,23 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { DiscoveryEvent discoEvt = (DiscoveryEvent)evt; - cctx.time().addTimeoutObject(new NodeFailureTimeoutObject(discoEvt.eventNode().id())); + UUID nodeId = discoEvt.eventNode().id(); + + cctx.time().addTimeoutObject(new NodeFailureTimeoutObject(nodeId)); if (txFinishSync != null) - txFinishSync.onNodeLeft(discoEvt.eventNode().id()); + txFinishSync.onNodeLeft(nodeId); + + for (TxDeadlockFuture fut : deadlockDetectFuts.values()) + fut.onNodeLeft(nodeId); } }, EVT_NODE_FAILED, EVT_NODE_LEFT); + this.txDeadlockDetection = new TxDeadlockDetection(cctx); + + cctx.gridIO().addMessageListener(TOPIC_TX, new DeadlockDetectionListener()); + for (IgniteInternalTx tx : idMap.values()) { if ((!tx.local() || tx.dht()) && !cctx.discovery().aliveAll(tx.masterNodeIds())) { if (log.isDebugEnabled()) @@ -194,6 +231,11 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } /** {@inheritDoc} */ + @Override protected void onKernalStop0(boolean cancel) { + cctx.gridIO().removeMessageListener(TOPIC_TX); + } + + /** {@inheritDoc} */ @Override protected void start0() throws IgniteCheckedException { txFinishSync = new GridCacheTxFinishSync<>(cctx); @@ -206,6 +248,12 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { for (Map.Entry<Long, IgniteInternalTx> e : threadMap.entrySet()) rollbackTx(e.getValue()); + + IgniteClientDisconnectedException err = + new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected."); + + for (TxDeadlockFuture fut : deadlockDetectFuts.values()) + fut.onDone(err); } /** @@ -378,7 +426,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { implicit, implicitSingle, sysCacheCtx != null, - sysCacheCtx != null ? sysCacheCtx.ioPolicy() : GridIoPolicy.SYSTEM_POOL, + sysCacheCtx != null ? sysCacheCtx.ioPolicy() : SYSTEM_POOL, concurrency, isolation, timeout, @@ -758,7 +806,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { throw new IgniteCheckedException("Transaction is marked for rollback: " + tx); } - if (tx.remainingTime() == 0) { + if (tx.remainingTime() == -1) { tx.setRollbackOnly(); throw new IgniteTxTimeoutCheckedException("Transaction timed out: " + this); @@ -1434,12 +1482,12 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { throws IgniteCheckedException { assert tx.optimistic() || !tx.local(); - long remainingTime = tx.timeout() - (U.currentTimeMillis() - tx.startTime()); + long remainingTime = tx.remainingTime(); // For serializable transactions, failure to acquire lock means // that there is a serializable conflict. For all other isolation levels, // we wait for the lock. - long timeout = tx.timeout() == 0 ? 0 : (remainingTime < 0 ? 0 : remainingTime); + long timeout = remainingTime < 0 ? 0 : remainingTime; GridCacheVersion serOrder = (tx.serializable() && tx.optimistic()) ? tx.nearXidVersion() : null; @@ -1843,6 +1891,217 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } /** + * @return {@code True} if deadlock detection is enabled. + */ + public boolean deadlockDetectionEnabled() { + return DEADLOCK_MAX_ITERS > 0; + } + + /** + * Performs deadlock detection for given keys. + * + * @param tx Target tx. + * @param keys Keys. + * @return Detection result. + */ + public IgniteInternalFuture<TxDeadlock> detectDeadlock( + IgniteInternalTx tx, + Set<IgniteTxKey> keys + ) { + return txDeadlockDetection.detectDeadlock(tx, keys); + } + + /** + * @param nodeId Node ID. + * @param fut Future. + * @param txKeys Tx keys. + */ + void txLocksInfo(UUID nodeId, TxDeadlockFuture fut, Set<IgniteTxKey> txKeys) { + ClusterNode node = cctx.node(nodeId); + + if (node == null) { + if (log.isDebugEnabled()) + log.debug("Failed to finish deadlock detection, node left: " + nodeId); + + fut.onDone(); + + return; + } + + if (supportsDeadlockDetection(node)) { + TxLocksRequest req = new TxLocksRequest(fut.futureId(), txKeys); + + try { + if (!cctx.localNodeId().equals(nodeId)) + req.prepareMarshal(cctx); + + cctx.gridIO().send(node, TOPIC_TX, req, SYSTEM_POOL); + } + catch (IgniteCheckedException e) { + if (e instanceof ClusterTopologyCheckedException) { + if (log.isDebugEnabled()) + log.debug("Failed to finish deadlock detection, node left: " + nodeId); + } + else + U.warn(log, "Failed to finish deadlock detection: " + e, e); + + fut.onDone(); + } + } + else { + if (log.isDebugEnabled()) + log.debug("Failed to finish deadlock detection, node does not support deadlock detection: " + node); + + fut.onDone(); + } + } + + /** + * @param node Node. + * @return {@code True} if node supports deadlock detection protocol. + */ + private boolean supportsDeadlockDetection(ClusterNode node) { + return TX_DEADLOCK_DETECTION_SINCE.compareToIgnoreTimestamp(node.version()) <= 0; + } + + /** + * @param tx Tx. + * @param txKeys Tx keys. + * @return {@code True} if key is involved into tx. + */ + private boolean hasKeys(IgniteInternalTx tx, Collection<IgniteTxKey> txKeys) { + for (IgniteTxKey key : txKeys) { + if (tx.txState().entry(key) != null) + return true; + } + + return false; + } + + /** + * @param txKeys Tx keys. + * @return Transactions locks and nodes. + */ + private TxLocksResponse txLocksInfo(Collection<IgniteTxKey> txKeys) { + TxLocksResponse res = new TxLocksResponse(); + + Collection<IgniteInternalTx> txs = activeTransactions(); + + for (IgniteInternalTx tx : txs) { + boolean nearTxLoc = tx instanceof GridNearTxLocal; + + if (!(nearTxLoc || tx instanceof GridDhtTxLocal) || !hasKeys(tx, txKeys)) + continue; + + Collection<IgniteTxEntry> txEntries = tx.allEntries(); + + Set<KeyCacheObject> requestedKeys = null; + + // Try to get info about requested keys for detached entries in case of GridNearTxLocal transaction + // in order to reduce amount of requests to remote nodes. + if (nearTxLoc) { + GridDhtColocatedLockFuture fut = colocatedLockFuture(tx); + + if (fut != null) + requestedKeys = fut.requestedKeys(); + } + + for (IgniteTxEntry txEntry : txEntries) { + IgniteTxKey txKey = txEntry.txKey(); + + if (res.txLocks(txKey) == null) { + GridCacheMapEntry e = (GridCacheMapEntry)txEntry.cached(); + + List<GridCacheMvccCandidate> locs = e.mvccAllLocal(); + + if (locs != null) { + boolean owner = false; + + for (GridCacheMvccCandidate loc : locs) { + if (!owner && loc.owner() && loc.tx()) + owner = true; + + if (!owner) // Skip all candidates in case when no tx that owns lock. + break; + + if (loc.tx()) { + UUID nearNodeId = loc.otherNodeId(); + + GridCacheVersion txId = loc.otherVersion(); + + TxLock txLock = new TxLock( + txId == null ? loc.version() : txId, + nearNodeId == null ? loc.nodeId() : nearNodeId, + loc.threadId(), + loc.owner() ? TxLock.OWNERSHIP_OWNER : TxLock.OWNERSHIP_CANDIDATE); + + res.addTxLock(txKey, txLock); + } + } + } + // Special case for optimal sequence of nodes processing. + else if (nearTxLoc && requestedKeys != null && requestedKeys.contains(txKey.key())) { + TxLock txLock = new TxLock( + tx.nearXidVersion(), + tx.nodeId(), + tx.threadId(), + TxLock.OWNERSHIP_REQUESTED); + + res.addTxLock(txKey, txLock); + } + else + res.addKey(txKey); + } + } + } + + return res; + } + + /** + * @param tx Tx. Must be instance of {@link GridNearTxLocal}. + * @return Colocated future. + */ + private GridDhtColocatedLockFuture colocatedLockFuture(IgniteInternalTx tx) { + assert tx instanceof GridNearTxLocal : tx; + + Collection<GridCacheMvccFuture<?>> futs = cctx.mvcc().mvccFutures(tx.nearXidVersion()); + + if (futs != null) { + for (GridCacheMvccFuture<?> fut : futs) { + if (fut instanceof GridDhtColocatedLockFuture) + return (GridDhtColocatedLockFuture)fut; + } + } + + return null; + } + + /** + * @param fut Future. + */ + public void addFuture(TxDeadlockFuture fut) { + TxDeadlockFuture old = deadlockDetectFuts.put(fut.futureId(), fut); + + assert old == null : old; + } + + /** + * @param futId Future ID. + * @return Found future. + */ + @Nullable public TxDeadlockFuture future(long futId) { + return deadlockDetectFuts.get(futId); + } + + /** + * @param futId Future ID. + */ + public void removeFuture(long futId) { + deadlockDetectFuts.remove(futId); + } + + /** * Timeout object for node failure handler. */ private final class NodeFailureTimeoutObject extends GridTimeoutObjectAdapter { @@ -2058,4 +2317,144 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } } } + + /** + * Transactions deadlock detection process message listener. + */ + private class DeadlockDetectionListener implements GridMessageListener { + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void onMessage(UUID nodeId, Object msg) { + GridCacheMessage cacheMsg = (GridCacheMessage)msg; + + unmarshall(nodeId, cacheMsg); + + if (cacheMsg.classError() != null) { + try { + processFailedMessage(nodeId, cacheMsg); + } + catch(Throwable e){ + U.error(log, "Failed to process message [senderId=" + nodeId + + ", messageType=" + cacheMsg.getClass() + ']', e); + + if (e instanceof Error) + throw (Error)e; + } + } + else { + if (log.isDebugEnabled()) + log.debug("Message received [locNodeId=" + cctx.localNodeId() + + ", rmtNodeId=" + nodeId + ", msg=" + msg + ']'); + + if (msg instanceof TxLocksRequest) { + TxLocksRequest req = (TxLocksRequest)msg; + + TxLocksResponse res = txLocksInfo(req.txKeys()); + + res.futureId(req.futureId()); + + try { + if (!cctx.localNodeId().equals(nodeId)) + res.prepareMarshal(cctx); + + cctx.gridIO().send(nodeId, TOPIC_TX, res, SYSTEM_POOL); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send response to node [node=" + nodeId + ", res=" + res + ']', e); + } + } + else if (msg instanceof TxLocksResponse) { + TxLocksResponse res = (TxLocksResponse)msg; + + long futId = res.futureId(); + + TxDeadlockFuture fut = future(futId); + + if (fut != null) + fut.onResult(nodeId, res); + else + U.warn(log, "Unexpected response received " + res); + } + else + throw new IllegalArgumentException("Unknown message [msg=" + msg + ']'); + } + } + + /** + * @param nodeId Node ID. + * @param msg Message. + */ + private void processFailedMessage(UUID nodeId, GridCacheMessage msg) throws IgniteCheckedException { + switch (msg.directType()) { + case -24: { + TxLocksRequest req = (TxLocksRequest)msg; + + TxLocksResponse res = new TxLocksResponse(); + + res.futureId(req.futureId()); + + try { + cctx.gridIO().send(nodeId, TOPIC_TX, res, SYSTEM_POOL); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send response to node (is node still alive?) [nodeId=" + nodeId + + ", res=" + res + ']', e); + } + } + + break; + + case -23: { + TxLocksResponse res = (TxLocksResponse)msg; + + TxDeadlockFuture fut = future(res.futureId()); + + if (fut == null) { + if (log.isDebugEnabled()) + log.debug("Failed to find future for response [sender=" + nodeId + ", res=" + res + ']'); + + return; + } + + fut.onResult(nodeId, res); + } + + break; + + default: + throw new IgniteCheckedException("Failed to process message. Unsupported direct type [msg=" + + msg + ']', msg.classError()); + } + + } + + /** + * @param nodeId Sender node ID. + * @param cacheMsg Message. + */ + private void unmarshall(UUID nodeId, GridCacheMessage cacheMsg) { + if (cctx.localNodeId().equals(nodeId)) + return; + + try { + cacheMsg.finishUnmarshal(cctx, cctx.deploy().globalLoader()); + } + catch (IgniteCheckedException e) { + cacheMsg.onClassError(e); + } + catch (BinaryObjectException e) { + cacheMsg.onClassError(new IgniteCheckedException(e)); + } + catch (Error e) { + if (cacheMsg.ignoreClassErrors() && + X.hasCause(e, NoClassDefFoundError.class, UnsupportedClassVersionError.class)) { + cacheMsg.onClassError( + new IgniteCheckedException("Failed to load class during unmarshalling: " + e, e) + ); + } + else + throw e; + } + } + } }
