ignite-2968 Deadlock detection for optimistic tx and near caches
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0465874d Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0465874d Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0465874d Branch: refs/heads/master Commit: 0465874d9dddcf962a82a2ef38589121201f0b75 Parents: 2891703 Author: agura <[email protected]> Authored: Wed Aug 24 21:13:29 2016 +0300 Committer: agura <[email protected]> Committed: Mon Aug 29 16:01:16 2016 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheMapEntry.java | 19 +- .../GridCachePartitionExchangeManager.java | 7 + .../GridDistributedTxPrepareRequest.java | 4 +- .../distributed/dht/GridDhtLockFuture.java | 53 +- .../distributed/dht/GridDhtTxFinishFuture.java | 4 +- .../cache/distributed/dht/GridDhtTxLocal.java | 26 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 109 +++- .../dht/GridDhtTxPrepareRequest.java | 4 +- .../colocated/GridDhtColocatedLockFuture.java | 37 +- .../distributed/near/GridNearLockFuture.java | 90 ++- ...arOptimisticSerializableTxPrepareFuture.java | 13 +- .../near/GridNearOptimisticTxPrepareFuture.java | 263 ++++++--- ...ridNearOptimisticTxPrepareFutureAdapter.java | 5 +- .../GridNearPessimisticTxPrepareFuture.java | 8 +- .../near/GridNearTxFinishFuture.java | 5 +- .../cache/distributed/near/GridNearTxLocal.java | 16 +- .../near/GridNearTxPrepareRequest.java | 4 +- .../cache/transactions/IgniteInternalTx.java | 3 +- .../cache/transactions/IgniteTxAdapter.java | 37 +- .../cache/transactions/IgniteTxHandler.java | 9 +- .../transactions/IgniteTxLocalAdapter.java | 19 +- .../cache/transactions/IgniteTxManager.java | 86 ++- .../cache/transactions/IgniteTxStateImpl.java | 11 +- .../cache/transactions/TxDeadlockDetection.java | 51 +- .../cache/IgniteTxConfigCacheSelfTest.java | 91 ++- .../IgniteTxTimeoutAbstractTest.java | 8 +- ...tionedMultiNodeLongTxTimeoutFullApiTest.java | 34 ++ ...nabledMultiNodeLongTxTimeoutFullApiTest.java | 41 ++ .../local/GridCacheLocalTxTimeoutSelfTest.java | 5 +- .../transactions/DepthFirstSearchTest.java | 100 +++- .../TxDeadlockDetectionNoHangsTest.java | 246 ++++++++ .../transactions/TxDeadlockDetectionTest.java | 13 +- ...timisticDeadlockDetectionCrossCacheTest.java | 257 +++++++++ .../TxOptimisticDeadlockDetectionTest.java | 574 +++++++++++++++++++ ...simisticDeadlockDetectionCrossCacheTest.java | 165 ++++-- .../TxPessimisticDeadlockDetectionTest.java | 50 +- .../IgniteCacheFullApiSelfTestSuite.java | 4 + .../TxDeadlockDetectionTestSuite.java | 6 + .../commands/cache/VisorCacheStopCommand.scala | 5 +- 39 files changed, 2127 insertions(+), 355 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 57fa68e..f692bf4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -4493,17 +4493,30 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } /** - * @return All MVCC local candidates. + * @return All MVCC local and non near candidates. */ + @SuppressWarnings("ForLoopReplaceableByForEach") @Nullable public synchronized List<GridCacheMvccCandidate> mvccAllLocal() { GridCacheMvcc mvcc = extras != null ? extras.mvcc() : null; if (mvcc == null) return null; - List<GridCacheMvccCandidate> locs = mvcc.allLocal(); + List<GridCacheMvccCandidate> allLocs = mvcc.allLocal(); - return (locs == null || locs.isEmpty()) ? null : new ArrayList<>(locs); + if (allLocs == null || allLocs.isEmpty()) + return null; + + List<GridCacheMvccCandidate> locs = new ArrayList<>(allLocs.size()); + + for (int i = 0; i < allLocs.size(); i++) { + GridCacheMvccCandidate loc = allLocs.get(i); + + if (!loc.nearLocal()) + locs.add(loc); + } + + return locs.isEmpty() ? null : locs; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index e6ab046..4eb61e3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1302,6 +1302,13 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana for (GridCacheFuture<?> fut : mvcc.atomicFutures()) U.warn(log, ">>> " + fut); + + if (tm != null) { + U.warn(log, "Pending transaction deadlock detection futures:"); + + for (IgniteInternalFuture<?> fut : tm.deadlockDetectionFutures()) + U.warn(log, ">>> " + fut); + } } for (GridCacheContext ctx : cctx.cacheContexts()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java index 72e68db..c691374 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java @@ -154,6 +154,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage /** * @param tx Cache transaction. + * @param timeout Transactions timeout. * @param reads Read entries. * @param writes Write entries. * @param txNodes Transaction nodes mapping. @@ -162,6 +163,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage */ public GridDistributedTxPrepareRequest( IgniteInternalTx tx, + long timeout, @Nullable Collection<IgniteTxEntry> reads, Collection<IgniteTxEntry> writes, Map<UUID, Collection<UUID>> txNodes, @@ -174,12 +176,12 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage threadId = tx.threadId(); concurrency = tx.concurrency(); isolation = tx.isolation(); - timeout = tx.timeout(); invalidate = tx.isInvalidate(); txSize = tx.size(); sys = tx.system(); plc = tx.ioPolicy(); + this.timeout = timeout; this.reads = reads; this.writes = writes; this.txNodes = txNodes; http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index 64b8745..b005b29 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -242,12 +242,6 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> msgLog = cctx.shared().txLockMessageLogger(); log = U.logger(cctx.kernalContext(), logRef, GridDhtLockFuture.class); } - - if (timeout > 0) { - timeoutObj = new LockTimeoutObject(); - - cctx.time().addTimeoutObject(timeoutObj); - } } /** {@inheritDoc} */ @@ -298,8 +292,10 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> /** * @return Entries. */ - public synchronized Collection<GridDhtCacheEntry> entriesCopy() { - return new ArrayList<>(entries()); + public Collection<GridDhtCacheEntry> entriesCopy() { + synchronized (futs) { + return new ArrayList<>(entries()); + } } /** @@ -412,7 +408,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> return null; } - synchronized (this) { + synchronized (futs) { entries.add(c == null || c.reentry() ? null : entry); if (c != null && !c.reentry()) @@ -614,7 +610,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> * @param t Error. */ public void onError(Throwable t) { - synchronized (this) { + synchronized (futs) { if (err != null) return; @@ -654,15 +650,16 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> * @param entry Entry whose lock ownership changed. */ @Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) { - if (isDone()) + if (isDone() || (inTx() && tx.remainingTime() == -1)) return false; // Check other futures. if (log.isDebugEnabled()) log.debug("Received onOwnerChanged() callback [entry=" + entry + ", owner=" + owner + "]"); if (owner != null && owner.version().equals(lockVer)) { - synchronized (this) { - pendingLocks.remove(entry.key()); + synchronized (futs) { + if (!pendingLocks.remove(entry.key())) + return false; } if (checkLocks()) @@ -677,8 +674,10 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> /** * @return {@code True} if locks have been acquired. */ - private synchronized boolean checkLocks() { - return pendingLocks.isEmpty(); + private boolean checkLocks() { + synchronized (futs) { + return pendingLocks.isEmpty(); + } } /** {@inheritDoc} */ @@ -709,7 +708,7 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> if (isDone() || (err == null && success && !checkLocks())) return false; - synchronized (this) { + synchronized (futs) { if (this.err == null) this.err = err; } @@ -776,13 +775,19 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> } readyLocks(); + + if (timeout > 0) { + timeoutObj = new LockTimeoutObject(); + + cctx.time().addTimeoutObject(timeoutObj); + } } /** * @param entries Entries. */ private void map(Iterable<GridDhtCacheEntry> entries) { - synchronized (this) { + synchronized (futs) { if (mapped) return; @@ -842,6 +847,8 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> if (log.isDebugEnabled()) log.debug("Mapped DHT lock future [dhtMap=" + F.nodeIds(dhtMap.keySet()) + ", dhtLockFut=" + this + ']'); + long timeout = inTx() ? tx.remainingTime() : this.timeout; + // Create mini futures. for (Map.Entry<ClusterNode, List<GridDhtCacheEntry>> mapped : dhtMap.entrySet()) { ClusterNode n = mapped.getKey(); @@ -853,6 +860,9 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> if (cnt > 0) { assert !n.id().equals(cctx.localNodeId()); + if (inTx() && tx.remainingTime() == -1) + return; + MiniFuture fut = new MiniFuture(n, dhtMapping); GridDhtLockRequest req = new GridDhtLockRequest( @@ -1109,7 +1119,14 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> if (log.isDebugEnabled()) log.debug("Timed out waiting for lock response: " + this); - timedOut = true; + synchronized (futs) { + timedOut = true; + + // Stop locks and responses processing. + pendingLocks.clear(); + + futs.clear(); + } boolean releaseLocks = !(inTx() && cctx.tm().deadlockDetectionEnabled()); http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index 4ece775..d2e26b4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -133,6 +133,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur } /** {@inheritDoc} */ + @SuppressWarnings("unchecked") @Override public boolean onNodeLeft(UUID nodeId) { for (IgniteInternalFuture<?> fut : futures()) if (isMini(fut)) { @@ -391,8 +392,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur * @param nearMap Near map. * @return {@code True} in case there is at least one synchronous {@code MiniFuture} to wait for. */ - private boolean finish(Map<UUID, GridDistributedTxMapping> dhtMap, - Map<UUID, GridDistributedTxMapping> nearMap) { + private boolean finish(Map<UUID, GridDistributedTxMapping> dhtMap, Map<UUID, GridDistributedTxMapping> nearMap) { if (tx.onePhaseCommit()) return false; http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index c9d4345..b659abb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -314,6 +314,8 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa true); } + long timeout = remainingTime(); + // For pessimistic mode we don't distribute prepare request. GridDhtTxPrepareFuture fut = prepFut; @@ -322,11 +324,16 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa if (!PREP_FUT_UPD.compareAndSet(this, null, fut = new GridDhtTxPrepareFuture( cctx, this, + timeout, nearMiniId, Collections.<IgniteTxKey, GridCacheVersion>emptyMap(), true, - needReturnValue()))) + needReturnValue()))) { + if (timeout == -1) + prepFut.onError(timeoutException()); + return prepFut; + } } else // Prepare was called explicitly. @@ -334,15 +341,16 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa if (!state(PREPARING)) { if (setRollbackOnly()) { - if (timedOut()) - fut.onError(new IgniteTxTimeoutCheckedException("Transaction timed out and was rolled back: " + this)); + if (timeout == -1) + fut.onError(new IgniteTxTimeoutCheckedException("Transaction timed out and was rolled back: " + + this)); else fut.onError(new IgniteCheckedException("Invalid transaction state for prepare [state=" + state() + ", tx=" + this + ']')); } else - fut.onError(new IgniteTxRollbackCheckedException("Invalid transaction state for prepare [state=" + state() - + ", tx=" + this + ']')); + fut.onError(new IgniteTxRollbackCheckedException("Invalid transaction state for prepare [state=" + + state() + ", tx=" + this + ']')); return fut; } @@ -394,6 +402,8 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa // In optimistic mode prepare still can be called explicitly from salvageTx. GridDhtTxPrepareFuture fut = prepFut; + long timeout = remainingTime(); + if (fut == null) { init(); @@ -401,6 +411,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa if (!PREP_FUT_UPD.compareAndSet(this, null, fut = new GridDhtTxPrepareFuture( cctx, this, + timeout, nearMiniId, verMap, last, @@ -410,6 +421,9 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa assert f.nearMiniId().equals(nearMiniId) : "Wrong near mini id on existing future " + "[futMiniId=" + f.nearMiniId() + ", miniId=" + nearMiniId + ", fut=" + f + ']'; + if (timeout == -1) + f.onError(timeoutException()); + return chainOnePhasePrepare(f); } } @@ -427,7 +441,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa fut.complete(); if (setRollbackOnly()) { - if (timedOut()) + if (timeout == -1) fut.onError(new IgniteTxTimeoutCheckedException("Transaction timed out and was rolled back: " + this)); else http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index e9805aa..1bdd9b8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -59,8 +59,10 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.dr.GridDrType; +import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException; import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException; +import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.GridLeanSet; import org.apache.ignite.internal.util.future.GridCompoundFuture; @@ -204,9 +206,13 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter /** */ private boolean invoke; + /** Timeout object. */ + private final PrepareTimeoutObject timeoutObj; + /** * @param cctx Context. * @param tx Transaction. + * @param timeout Timeout. * @param nearMiniId Near mini future id. * @param dhtVerMap DHT versions map. * @param last {@code True} if this is last prepare operation for node. @@ -215,6 +221,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter public GridDhtTxPrepareFuture( GridCacheSharedContext cctx, final GridDhtTxLocalAdapter tx, + long timeout, IgniteUuid nearMiniId, Map<IgniteTxKey, GridCacheVersion> dhtVerMap, boolean last, @@ -243,6 +250,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter assert dhtMap != null; assert nearMap != null; + + timeoutObj = timeout > 0 ? new PrepareTimeoutObject(timeout) : null; } /** {@inheritDoc} */ @@ -269,7 +278,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter boolean rmv; - synchronized (lockKeys) { + synchronized (futs) { rmv = lockKeys.remove(entry.txKey()); } @@ -300,7 +309,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter if (!locksReady) return false; - synchronized (lockKeys) { + synchronized (futs) { return lockKeys.isEmpty(); } } @@ -483,32 +492,28 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter * @param res Result. */ public void onResult(UUID nodeId, GridDhtTxPrepareResponse res) { - if (!isDone()) { - boolean found = false; - - MiniFuture mini = miniFuture(res.miniId()); + if (isDone()) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("DHT prepare fut, response for finished future [txId=" + tx.nearXidVersion() + + ", dhtTxId=" + tx.xidVersion() + + ", node=" + nodeId + + ", res=" + res + + ", fut=" + this + ']'); + } - if (mini != null) { - found = true; + return; + } - assert mini.node().id().equals(nodeId); + MiniFuture mini = miniFuture(res.miniId()); - mini.onResult(res); - } + if (mini != null) { + assert mini.node().id().equals(nodeId); - if (!found) { - if (msgLog.isDebugEnabled()) { - msgLog.debug("DHT prepare fut, failed to find mini future [txId=" + tx.nearXidVersion() + - ", dhtTxId=" + tx.xidVersion() + - ", node=" + nodeId + - ", res=" + res + - ", fut=" + this + ']'); - } - } + mini.onResult(res); } else { if (msgLog.isDebugEnabled()) { - msgLog.debug("DHT prepare fut, response for finished future [txId=" + tx.nearXidVersion() + + msgLog.debug("DHT prepare fut, failed to find mini future [txId=" + tx.nearXidVersion() + ", dhtTxId=" + tx.xidVersion() + ", node=" + nodeId + ", res=" + res + @@ -525,8 +530,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter */ @SuppressWarnings("ForLoopReplaceableByForEach") private MiniFuture miniFuture(IgniteUuid miniId) { - // We iterate directly over the futs collection here to avoid copy. synchronized (futs) { + // We iterate directly over the futs collection here to avoid copy. // Avoid iterator creation. for (int i = 0; i < futs.size(); i++) { IgniteInternalFuture<IgniteInternalTx> fut = futs.get(i); @@ -543,9 +548,9 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter return null; } } - } - return null; + return null; + } } /** @@ -583,7 +588,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter } if (tx.optimistic() && txEntry.explicitVersion() == null) { - synchronized (lockKeys) { + synchronized (futs) { lockKeys.add(txEntry.txKey()); } } @@ -934,6 +939,9 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter // Don't forget to clean up. cctx.mvcc().removeMvccFuture(this); + if (timeoutObj != null) + cctx.time().removeTimeoutObject(timeoutObj); + return true; } @@ -989,6 +997,11 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter readyLocks(); + if (timeoutObj != null) { + // Start timeout tracking after 'readyLocks' to avoid race with timeout processing. + cctx.time().addTimeoutObject(timeoutObj); + } + mapIfLocked(); } @@ -1158,6 +1171,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter if (last) { assert tx.transactionNodes() != null; + final long timeout = timeoutObj != null ? timeoutObj.timeout : 0; + // Create mini futures. for (GridDistributedTxMapping dhtMapping : tx.dhtMap().values()) { assert !dhtMapping.empty(); @@ -1175,6 +1190,9 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter if (F.isEmpty(dhtWrites) && F.isEmpty(nearWrites)) continue; + if (tx.remainingTime() == -1) + return; + MiniFuture fut = new MiniFuture(n.id(), dhtMapping, nearMapping); add(fut); // Append new future. @@ -1186,6 +1204,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter fut.futureId(), tx.topologyVersion(), tx, + timeout, dhtWrites, nearWrites, txNodes, @@ -1284,15 +1303,19 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) { if (!tx.dhtMap().containsKey(nearMapping.node().id())) { + if (tx.remainingTime() == -1) + return; + MiniFuture fut = new MiniFuture(nearMapping.node().id(), null, nearMapping); - add(fut); // Append new future. + add(fut); GridDhtTxPrepareRequest req = new GridDhtTxPrepareRequest( futId, fut.futureId(), tx.topologyVersion(), tx, + timeout, null, nearMapping.writes(), tx.transactionNodes(), @@ -1719,4 +1742,38 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error()); } } + + /** + * + */ + private class PrepareTimeoutObject extends GridTimeoutObjectAdapter { + /** */ + private final long timeout; + + /** + * @param timeout Timeout. + */ + PrepareTimeoutObject(long timeout) { + super(timeout); + + this.timeout = timeout; + } + + /** {@inheritDoc} */ + @Override public void onTimeout() { + synchronized (futs) { + futs.clear(); + + lockKeys.clear(); + } + + onError(new IgniteTxTimeoutCheckedException("Failed to acquire lock within " + + "provided timeout for transaction [timeout=" + tx.timeout() + ", tx=" + tx + ']')); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(PrepareTimeoutObject.class, this); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java index d31ecba..1cdc96f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java @@ -112,6 +112,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { * @param miniId Mini future ID. * @param topVer Topology version. * @param tx Transaction. + * @param timeout Transaction timeout. * @param dhtWrites DHT writes. * @param nearWrites Near writes. * @param txNodes Transaction nodes mapping. @@ -124,6 +125,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { IgniteUuid miniId, AffinityTopologyVersion topVer, GridDhtTxLocalAdapter tx, + long timeout, Collection<IgniteTxEntry> dhtWrites, Collection<IgniteTxEntry> nearWrites, Map<UUID, Collection<UUID>> txNodes, @@ -133,7 +135,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { UUID subjId, int taskNameHash, boolean addDepInfo) { - super(tx, null, dhtWrites, txNodes, onePhaseCommit, addDepInfo); + super(tx, timeout, null, dhtWrites, txNodes, onePhaseCommit, addDepInfo); assert futId != null; assert miniId != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index f77efee..b0eea01 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -443,23 +443,33 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture /** * @return Keys for which locks requested from remote nodes but response isn't received. */ - public Set<KeyCacheObject> requestedKeys() { - Set<KeyCacheObject> requestedKeys = null; + public Set<IgniteTxKey> requestedKeys() { + synchronized (futs) { + if (timeoutObj != null && timeoutObj.requestedKeys != null) + return timeoutObj.requestedKeys; + + return requestedKeys0(); + } + } + /** + * @return Keys for which locks requested from remote nodes but response isn't received. + */ + private Set<IgniteTxKey> requestedKeys0() { for (IgniteInternalFuture<Boolean> miniFut : futures()) { if (isMini(miniFut) && !miniFut.isDone()) { - if (requestedKeys == null) - requestedKeys = new HashSet<>(); - MiniFuture mini = (MiniFuture)miniFut; - requestedKeys.addAll(mini.keys); + Set<IgniteTxKey> requestedKeys = U.newHashSet(mini.keys.size()); + + for (KeyCacheObject key : mini.keys) + requestedKeys.add(new IgniteTxKey(key, cctx.cacheId())); return requestedKeys; } } - return requestedKeys; + return null; } /** @@ -1312,12 +1322,21 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture super(timeout); } + /** Requested keys. */ + private Set<IgniteTxKey> requestedKeys; + /** {@inheritDoc} */ @Override public void onTimeout() { if (log.isDebugEnabled()) log.debug("Timed out waiting for lock response: " + this); if (inTx() && cctx.tm().deadlockDetectionEnabled()) { + synchronized (futs) { + requestedKeys = requestedKeys0(); + + futs.clear(); // Stop response processing. + } + Set<IgniteTxKey> keys = new HashSet<>(); for (IgniteTxEntry txEntry : tx.allEntries()) { @@ -1434,7 +1453,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture tx.removeMapping(node.id()); // Primary node left the grid, so fail the future. - GridDhtColocatedLockFuture.this.onDone(newTopologyException(e, node.id())); + GridDhtColocatedLockFuture.this.onDone(false, newTopologyException(e, node.id())); onDone(true); } @@ -1494,7 +1513,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture else remap(); } - else { + else { int i = 0; for (KeyCacheObject k : keys) { http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index 4b6448b..3d9b6ab 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -20,10 +20,12 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; @@ -48,8 +50,10 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopolo import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTransactionalCacheAdapter; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; +import org.apache.ignite.internal.processors.cache.transactions.TxDeadlock; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; +import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture; import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -63,7 +67,9 @@ import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.transactions.TransactionDeadlockException; import org.apache.ignite.transactions.TransactionIsolation; import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; @@ -481,6 +487,38 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean } /** + * @return Keys for which locks requested from remote nodes but response isn't received. + */ + public Set<IgniteTxKey> requestedKeys() { + synchronized (futs) { + if (timeoutObj != null && timeoutObj.requestedKeys != null) + return timeoutObj.requestedKeys; + + return requestedKeys0(); + } + } + + /** + * @return Keys for which locks requested from remote nodes but response isn't received. + */ + private Set<IgniteTxKey> requestedKeys0() { + for (IgniteInternalFuture<Boolean> miniFut : futures()) { + if (isMini(miniFut) && !miniFut.isDone()) { + MiniFuture mini = (MiniFuture)miniFut; + + Set<IgniteTxKey> requestedKeys = U.newHashSet(mini.keys.size()); + + for (KeyCacheObject key : mini.keys) + requestedKeys.add(new IgniteTxKey(key, cctx.cacheId())); + + return requestedKeys; + } + } + + return null; + } + + /** * Finds pending mini future by the given mini ID. * * @param miniId Mini ID to find. @@ -621,6 +659,10 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean if (log.isDebugEnabled()) log.debug("Received onDone(..) callback [success=" + success + ", err=" + err + ", fut=" + this + ']'); + if (inTx() && cctx.tm().deadlockDetectionEnabled() && + (this.err instanceof IgniteTxTimeoutCheckedException || timedOut)) + return false; + // If locks were not acquired yet, delay completion. if (isDone() || (err == null && success && !checkLocks())) return false; @@ -727,7 +769,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean topVer = tx.topologyVersionSnapshot(); if (topVer != null) { - for (GridDhtTopologyFuture fut : cctx.shared().exchange().exchangeFutures()){ + for (GridDhtTopologyFuture fut : cctx.shared().exchange().exchangeFutures()) { if (fut.topologyVersion().equals(topVer)){ Throwable err = fut.validateCache(cctx); @@ -1373,6 +1415,9 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean super(timeout); } + /** Requested keys. */ + private Set<IgniteTxKey> requestedKeys; + /** {@inheritDoc} */ @SuppressWarnings({"ThrowableInstanceNeverThrown"}) @Override public void onTimeout() { @@ -1381,7 +1426,42 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean timedOut = true; - onComplete(false, true); + if (inTx() && cctx.tm().deadlockDetectionEnabled()) { + synchronized (futs) { + requestedKeys = requestedKeys0(); + + futs.clear(); // Stop response processing. + } + + Set<IgniteTxKey> keys = new HashSet<>(); + + for (IgniteTxEntry txEntry : tx.allEntries()) { + if (!txEntry.locked()) + keys.add(txEntry.txKey()); + } + + IgniteInternalFuture<TxDeadlock> fut = cctx.tm().detectDeadlock(tx, keys); + + fut.listen(new IgniteInClosure<IgniteInternalFuture<TxDeadlock>>() { + @Override public void apply(IgniteInternalFuture<TxDeadlock> fut) { + try { + TxDeadlock deadlock = fut.get(); + + if (deadlock != null) + err = new TransactionDeadlockException(deadlock.toString(cctx.shared())); + } + catch (IgniteCheckedException e) { + err = e; + + U.warn(log, "Failed to detect deadlock.", e); + } + + onComplete(false, true); + } + }); + } + else + onComplete(false, true); } /** {@inheritDoc} */ @@ -1466,7 +1546,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean tx.removeMapping(node.id()); // Primary node left the grid, so fail the future. - GridNearLockFuture.this.onDone(newTopologyException(e, node.id())); + GridNearLockFuture.this.onDone(false, newTopologyException(e, node.id())); onDone(true); } @@ -1483,6 +1563,10 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean } if (res.error() != null) { + if (inTx() && cctx.tm().deadlockDetectionEnabled() && + (res.error() instanceof IgniteTxTimeoutCheckedException || tx.remainingTime() == -1)) + return; + if (log.isDebugEnabled()) log.debug("Finishing mini future with an error due to error in response [miniFut=" + this + ", res=" + res + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java index 6515140..d251528 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@ -184,7 +184,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim } } - if (e instanceof IgniteTxOptimisticCheckedException) { + if (e instanceof IgniteTxOptimisticCheckedException || e instanceof IgniteTxTimeoutCheckedException) { if (m != null) tx.removeMapping(m.node().id()); } @@ -424,10 +424,21 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim final ClusterNode n = m.node(); + long timeout = tx.remainingTime(); + + if (timeout == -1) { + IgniteCheckedException err = tx.timeoutException(); + + fut.onResult(err); + + return err; + } + GridNearTxPrepareRequest req = new GridNearTxPrepareRequest( futId, tx.topologyVersion(), tx, + timeout, m.reads(), m.writes(), m.near(), http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index 1ea99c4..5a300ff 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -20,9 +20,11 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import java.util.ArrayDeque; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -41,8 +43,11 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTx import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; +import org.apache.ignite.internal.processors.cache.transactions.TxDeadlock; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; +import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -53,7 +58,9 @@ import org.apache.ignite.internal.util.typedef.P1; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiClosure; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.transactions.TransactionDeadlockException; import org.apache.ignite.transactions.TransactionTimeoutException; import org.jetbrains.annotations.Nullable; @@ -73,8 +80,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa * @param cctx Context. * @param tx Transaction. */ - public GridNearOptimisticTxPrepareFuture(GridCacheSharedContext cctx, - GridNearTxLocal tx) { + public GridNearOptimisticTxPrepareFuture(GridCacheSharedContext cctx, GridNearTxLocal tx) { super(cctx, tx); assert tx.optimistic() && !tx.serializable() : tx; @@ -85,7 +91,11 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa if (log.isDebugEnabled()) log.debug("Transaction future received owner changed callback: " + entry); - if ((entry.context().isNear() || entry.context().isLocal()) && owner != null && tx.hasWriteKey(entry.txKey())) { + if (tx.remainingTime() == -1) + return false; + + if ((entry.context().isNear() || entry.context().isLocal()) && + owner != null && tx.hasWriteKey(entry.txKey())) { if (keyLockFut != null) keyLockFut.onKeyLocked(entry.txKey()); @@ -124,6 +134,12 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa * @param discoThread {@code True} if executed from discovery thread. */ void onError(Throwable e, boolean discoThread) { + if (e instanceof IgniteTxTimeoutCheckedException) { + onTimeout(); + + return; + } + if (X.hasCause(e, ClusterTopologyCheckedException.class) || X.hasCause(e, ClusterTopologyException.class)) { if (tx.onePhaseCommit()) { tx.markForBackupCheck(); @@ -160,7 +176,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa if (mini != null) { assert mini.node().id().equals(nodeId); - mini.onResult(nodeId, res); + mini.onResult(res); } else { if (msgLog.isDebugEnabled()) { @@ -182,6 +198,33 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa } /** + * @return Keys for which {@link MiniFuture} isn't completed. + */ + @SuppressWarnings("ForLoopReplaceableByForEach") + public Set<IgniteTxKey> requestedKeys() { + synchronized (futs) { + for (int i = 0; i < futs.size(); i++) { + IgniteInternalFuture<GridNearTxPrepareResponse> fut = futs.get(i); + + if (isMini(fut) && !fut.isDone()) { + MiniFuture miniFut = (MiniFuture)fut; + + Collection<IgniteTxEntry> entries = miniFut.mapping().entries(); + + Set<IgniteTxKey> keys = U.newHashSet(entries.size()); + + for (IgniteTxEntry entry : entries) + keys.add(entry.txKey()); + + return keys; + } + } + } + + return null; + } + + /** * Finds pending mini future by the given mini ID. * * @param miniId Mini ID to find. @@ -264,7 +307,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa if (!txStateCheck) { if (tx.setRollbackOnly()) { - if (tx.timedOut()) + if (tx.remainingTime() == -1) onError(new IgniteTxTimeoutCheckedException("Transaction timed out and " + "was rolled back: " + this), false); else @@ -437,89 +480,97 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa final ClusterNode n = m.node(); - GridNearTxPrepareRequest req = new GridNearTxPrepareRequest( - futId, - tx.topologyVersion(), - tx, - null, - m.writes(), - m.near(), - txMapping.transactionNodes(), - m.last(), - tx.onePhaseCommit(), - tx.needReturnValue() && tx.implicit(), - tx.implicitSingle(), - m.explicitLock(), - tx.subjectId(), - tx.taskNameHash(), - m.clientFirst(), - tx.activeCachesDeploymentEnabled()); - - for (IgniteTxEntry txEntry : m.entries()) { - if (txEntry.op() == TRANSFORM) - req.addDhtVersion(txEntry.txKey(), null); - } + long timeout = tx.remainingTime(); + + if (timeout != -1) { + GridNearTxPrepareRequest req = new GridNearTxPrepareRequest( + futId, + tx.topologyVersion(), + tx, + timeout, + null, + m.writes(), + m.near(), + txMapping.transactionNodes(), + m.last(), + tx.onePhaseCommit(), + tx.needReturnValue() && tx.implicit(), + tx.implicitSingle(), + m.explicitLock(), + tx.subjectId(), + tx.taskNameHash(), + m.clientFirst(), + tx.activeCachesDeploymentEnabled()); + + for (IgniteTxEntry txEntry : m.entries()) { + if (txEntry.op() == TRANSFORM) + req.addDhtVersion(txEntry.txKey(), null); + } - // Must lock near entries separately. - if (m.near()) { - try { - tx.optimisticLockEntries(req.writes()); + // Must lock near entries separately. + if (m.near()) { + try { + tx.optimisticLockEntries(req.writes()); - tx.userPrepare(); - } - catch (IgniteCheckedException e) { - onError(e, false); + tx.userPrepare(); + } + catch (IgniteCheckedException e) { + onError(e, false); + } } - } - final MiniFuture fut = new MiniFuture(this, m, mappings); + final MiniFuture fut = new MiniFuture(this, m, mappings); - req.miniId(fut.futureId()); + req.miniId(fut.futureId()); - add(fut); // Append new future. + add(fut); // Append new future. - // If this is the primary node for the keys. - if (n.isLocal()) { - // At this point, if any new node joined, then it is - // waiting for this transaction to complete, so - // partition reassignments are not possible here. - IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = cctx.tm().txHandler().prepareTx(n.id(), tx, req); + // If this is the primary node for the keys. + if (n.isLocal()) { + // At this point, if any new node joined, then it is + // waiting for this transaction to complete, so + // partition reassignments are not possible here. + IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = + cctx.tm().txHandler().prepareTx(n.id(), tx, req); - prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() { - @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) { - try { - fut.onResult(n.id(), prepFut.get()); + prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() { + @Override public void apply(IgniteInternalFuture<GridNearTxPrepareResponse> prepFut) { + try { + fut.onResult(prepFut.get()); + } + catch (IgniteCheckedException e) { + fut.onResult(e); + } } - catch (IgniteCheckedException e) { - fut.onResult(e); + }); + } + else { + try { + cctx.io().send(n, req, tx.ioPolicy()); + + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near optimistic prepare fut, sent request [txId=" + tx.nearXidVersion() + + ", node=" + n.id() + ']'); } } - }); - } - else { - try { - cctx.io().send(n, req, tx.ioPolicy()); + catch (ClusterTopologyCheckedException e) { + e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion())); - if (msgLog.isDebugEnabled()) { - msgLog.debug("Near optimistic prepare fut, sent request [txId=" + tx.nearXidVersion() + - ", node=" + n.id() + ']'); + fut.onNodeLeft(e, false); } - } - catch (ClusterTopologyCheckedException e) { - e.retryReadyFuture(cctx.nextAffinityReadyFuture(tx.topologyVersion())); + catch (IgniteCheckedException e) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near optimistic prepare fut, failed to sent request [txId=" + tx.nearXidVersion() + + ", node=" + n.id() + + ", err=" + e + ']'); + } - fut.onNodeLeft(e, false); - } - catch (IgniteCheckedException e) { - if (msgLog.isDebugEnabled()) { - msgLog.debug("Near optimistic prepare fut, failed to sent request [txId=" + tx.nearXidVersion() + - ", node=" + n.id() + - ", err=" + e + ']'); + fut.onResult(e); } - - fut.onResult(e); } } + else + onTimeout(); } finally { if (set) @@ -623,6 +674,61 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa return cur; } + /** + * + */ + @SuppressWarnings("ForLoopReplaceableByForEach") + private void onTimeout() { + if (cctx.tm().deadlockDetectionEnabled()) { + Set<IgniteTxKey> keys = null; + + if (keyLockFut != null) + keys = new HashSet<>(keyLockFut.lockKeys); + else { + if (futs != null && !futs.isEmpty()) { + for (int i = 0; i < futs.size(); i++) { + IgniteInternalFuture<GridNearTxPrepareResponse> fut = futs.get(i); + + if (isMini(fut) && !fut.isDone()) { + MiniFuture miniFut = (MiniFuture)fut; + + Collection<IgniteTxEntry> entries = miniFut.mapping().entries(); + + keys = U.newHashSet(entries.size()); + + for (IgniteTxEntry entry : entries) + keys.add(entry.txKey()); + + break; + } + } + } + } + + add(new GridEmbeddedFuture<>(new IgniteBiClosure<TxDeadlock, Exception, GridNearTxPrepareResponse>() { + @Override public GridNearTxPrepareResponse apply(TxDeadlock deadlock, Exception e) { + if (e != null) + U.warn(log, "Failed to detect deadlock.", e); + else { + e = new IgniteTxTimeoutCheckedException("Failed to acquire lock within provided timeout for " + + "transaction [timeout=" + tx.timeout() + ", tx=" + tx + ']', + deadlock != null ? new TransactionDeadlockException(deadlock.toString(cctx)) : null); + } + + onDone(null, e); + + return null; + } + }, cctx.tm().detectDeadlock(tx, keys))); + } + else { + ERR_UPD.compareAndSet(this, null, new IgniteTxTimeoutCheckedException("Failed to acquire lock " + + "within provided timeout for transaction [timeout=" + tx.timeout() + ", tx=" + tx + ']')); + + onComplete(false); + } + } + /** {@inheritDoc} */ @Override public String toString() { Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() { @@ -652,7 +758,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa private static final long serialVersionUID = 0L; /** Receive result flag updater. */ - private static AtomicIntegerFieldUpdater<MiniFuture> RCV_RES_UPD = + private static final AtomicIntegerFieldUpdater<MiniFuture> RCV_RES_UPD = AtomicIntegerFieldUpdater.newUpdater(MiniFuture.class, "rcvRes"); /** Parent future. */ @@ -745,15 +851,21 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa } /** - * @param nodeId Failed node ID. * @param res Result callback. */ @SuppressWarnings("ThrowableResultOfMethodCallIgnored") - void onResult(UUID nodeId, final GridNearTxPrepareResponse res) { + void onResult(final GridNearTxPrepareResponse res) { if (isDone()) return; if (RCV_RES_UPD.compareAndSet(this, 0, 1)) { + if (parent.cctx.tm().deadlockDetectionEnabled() && + (parent.tx.remainingTime() == -1 || res.error() instanceof IgniteTxTimeoutCheckedException)) { + parent.onTimeout(); + + return; + } + if (res.error() != null) { // Fail the whole compound future. parent.onError(res.error(), false); @@ -801,8 +913,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa */ private void remap() { parent.prepareOnTopology(true, new Runnable() { - @Override - public void run() { + @Override public void run() { onDone((GridNearTxPrepareResponse) null); } }); http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java index 4d77a3c..a00cf3e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java @@ -40,8 +40,7 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT * @param cctx Context. * @param tx Transaction. */ - public GridNearOptimisticTxPrepareFutureAdapter(GridCacheSharedContext cctx, - GridNearTxLocal tx) { + public GridNearOptimisticTxPrepareFutureAdapter(GridCacheSharedContext cctx, GridNearTxLocal tx) { super(cctx, tx); assert tx.optimistic() : tx; @@ -172,7 +171,7 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT protected static class KeyLockFuture extends GridFutureAdapter<GridNearTxPrepareResponse> { /** */ @GridToStringInclude - private Collection<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>(); + protected Collection<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>(); /** */ private volatile boolean allKeysAdded; http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java index ef2edc9..34b8281 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java @@ -153,7 +153,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA @Override public void prepare() { if (!tx.state(PREPARING)) { if (tx.setRollbackOnly()) { - if (tx.timedOut()) + if (tx.remainingTime() == -1) onDone(new IgniteTxTimeoutCheckedException("Transaction timed out and was rolled back: " + tx)); else onDone(new IgniteCheckedException("Invalid transaction state for prepare " + @@ -222,6 +222,11 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA checkOnePhase(); + long timeout = tx.remainingTime(); + + if (timeout == -1) + onDone(new IgniteTxTimeoutCheckedException("Transaction timed out and was rolled back: " + tx)); + for (final GridDistributedTxMapping m : mappings.values()) { final ClusterNode node = m.node(); @@ -229,6 +234,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA futId, tx.topologyVersion(), tx, + timeout, m.reads(), m.writes(), m.near(), http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index adde63c..bb5d482 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -633,6 +633,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu if (m.explicitLock()) syncMode = FULL_SYNC; + // Version to be added in completed versions on primary node. + GridCacheVersion completedVer = !commit && tx.timeout() > 0 ? tx.xidVersion() : null; + GridNearTxFinishRequest req = new GridNearTxFinishRequest( futId, tx.xidVersion(), @@ -645,7 +648,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu m.explicitLock(), tx.storeEnabled(), tx.topologyVersion(), - null, + completedVer, // Reuse 'baseVersion' to do not add new fields in message. null, null, tx.size(), http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 28c60d4..410baf8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -796,6 +796,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { GridNearTxPrepareFutureAdapter fut = (GridNearTxPrepareFutureAdapter)prepFut; if (fut == null) { + long timeout = remainingTime(); + // Future must be created before any exception can be thrown. if (optimistic()) { fut = serializable() ? @@ -807,6 +809,12 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { if (!PREP_FUT_UPD.compareAndSet(this, null, fut)) return prepFut; + + if (timeout == -1) { + fut.onDone(this, timeoutException()); + + return fut; + } } else // Prepare was called explicitly. @@ -964,8 +972,10 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { Map<UUID, Collection<UUID>> txNodes, boolean last ) { + long timeout = remainingTime(); + if (state() != PREPARING) { - if (timedOut()) + if (timeout == -1) return new GridFinishedFuture<>( new IgniteTxTimeoutCheckedException("Transaction timed out: " + this)); @@ -975,11 +985,15 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { new IgniteCheckedException("Invalid transaction state for prepare [state=" + state() + ", tx=" + this + ']')); } + if (timeout == -1) + return new GridFinishedFuture<>(timeoutException()); + init(); GridDhtTxPrepareFuture fut = new GridDhtTxPrepareFuture( cctx, this, + timeout, IgniteUuid.randomUuid(), Collections.<IgniteTxKey, GridCacheVersion>emptyMap(), last, http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java index 9dfdb43..e55566b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxPrepareRequest.java @@ -94,6 +94,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { * @param futId Future ID. * @param topVer Topology version. * @param tx Transaction. + * @param timeout Transaction timeout. * @param reads Read entries. * @param writes Write entries. * @param near {@code True} if mapping is for near caches. @@ -112,6 +113,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { IgniteUuid futId, AffinityTopologyVersion topVer, IgniteInternalTx tx, + long timeout, Collection<IgniteTxEntry> reads, Collection<IgniteTxEntry> writes, boolean near, @@ -126,7 +128,7 @@ public class GridNearTxPrepareRequest extends GridDistributedTxPrepareRequest { boolean firstClientReq, boolean addDepInfo ) { - super(tx, reads, writes, txNodes, onePhaseCommit, addDepInfo); + super(tx, timeout, reads, writes, txNodes, onePhaseCommit, addDepInfo); assert futId != null; assert !firstClientReq || tx.optimistic() : tx; http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java index 8c0425d..dd900fe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java @@ -33,7 +33,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedExceptio import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; import org.apache.ignite.internal.util.lang.GridTuple; import org.apache.ignite.lang.IgniteAsyncSupported; @@ -46,7 +45,7 @@ import org.jetbrains.annotations.Nullable; /** * Transaction managed by cache ({@code 'Ex'} stands for external). */ -public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject { +public interface IgniteInternalTx extends AutoCloseable { /** * */ http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index f76f4bf..eb2989e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -712,7 +712,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement /** * @return Transaction timeout exception. */ - protected final IgniteCheckedException timeoutException() { + public final IgniteCheckedException timeoutException() { return new IgniteTxTimeoutCheckedException("Failed to acquire lock within provided timeout " + "for transaction [timeout=" + timeout() + ", tx=" + this + ']'); } @@ -1032,7 +1032,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement * @return {@code True} if state changed. */ @SuppressWarnings({"TooBroadScope"}) - private boolean state(TransactionState state, boolean timedOut) { + protected boolean state(TransactionState state, boolean timedOut) { boolean valid = false; TransactionState prev; @@ -1154,24 +1154,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** {@inheritDoc} */ - @Override public IgniteUuid timeoutId() { - return xidVer.asGridUuid(); - } - - /** {@inheritDoc} */ - @Override public long endTime() { - long endTime = timeout == 0 ? Long.MAX_VALUE : startTime + timeout; - - return endTime > 0 ? endTime : endTime < 0 ? Long.MAX_VALUE : endTime; - } - - /** {@inheritDoc} */ - @Override public void onTimeout() { - if (local() && !dht()) - state(MARKED_ROLLBACK, true); - } - - /** {@inheritDoc} */ @Override public boolean timedOut() { return timedOut; } @@ -2387,21 +2369,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** {@inheritDoc} */ - @Override public IgniteUuid timeoutId() { - return null; - } - - /** {@inheritDoc} */ - @Override public long endTime() { - return 0; - } - - /** {@inheritDoc} */ - @Override public void onTimeout() { - // No-op. - } - - /** {@inheritDoc} */ @Override public boolean equals(Object o) { return this == o || o instanceof IgniteInternalTx && xid.equals(((IgniteInternalTx)o).xid()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 7c3c206..e67e60f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -112,8 +112,7 @@ public class IgniteTxHandler { * @param req Request. * @return Prepare future. */ - public IgniteInternalFuture<?> processNearTxPrepareRequest(final UUID nearNodeId, - final GridNearTxPrepareRequest req) { + public IgniteInternalFuture<?> processNearTxPrepareRequest(final UUID nearNodeId, GridNearTxPrepareRequest req) { if (txPrepareMsgLog.isDebugEnabled()) { txPrepareMsgLog.debug("Received near prepare request [txId=" + req.version() + ", node=" + nearNodeId + ']'); @@ -361,7 +360,7 @@ public class IgniteTxHandler { req.deployInfo() != null); try { - ctx.io().send(nearNode, res, req.policy()); + ctx.io().send(nearNodeId, res, req.policy()); if (txPrepareMsgLog.isDebugEnabled()) { txPrepareMsgLog.debug("Sent remap response for near prepare [txId=" + req.version() + @@ -667,6 +666,10 @@ public class IgniteTxHandler { assert nodeId != null; assert req != null; + // 'baseVersion' message field is re-used for version to be added in completed versions. + if (!req.commit() && req.baseVersion() != null) + ctx.tm().addRolledbackTx(null, req.baseVersion()); + // Transaction on local cache only. if (locTx != null && !locTx.nearLocallyMapped() && !locTx.colocatedLocallyMapped()) return new GridFinishedFuture<IgniteInternalTx>(locTx); http://git-wip-us.apache.org/repos/asf/ignite/blob/0465874d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index d9aca4a..9ad7fb0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -107,6 +107,7 @@ import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE; import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRIMARY; import static org.apache.ignite.transactions.TransactionState.COMMITTED; import static org.apache.ignite.transactions.TransactionState.COMMITTING; +import static org.apache.ignite.transactions.TransactionState.MARKED_ROLLBACK; import static org.apache.ignite.transactions.TransactionState.PREPARING; import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK; import static org.apache.ignite.transactions.TransactionState.ROLLING_BACK; @@ -547,14 +548,15 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig @SuppressWarnings({"CatchGenericClass"}) public void userPrepare() throws IgniteCheckedException { if (state() != PREPARING) { - if (timedOut()) + if (remainingTime() == -1) throw new IgniteTxTimeoutCheckedException("Transaction timed out: " + this); TransactionState state = state(); setRollbackOnly(); - throw new IgniteCheckedException("Invalid transaction state for prepare [state=" + state + ", tx=" + this + ']'); + throw new IgniteCheckedException("Invalid transaction state for prepare [state=" + + state + ", tx=" + this + ']'); } checkValid(); @@ -629,7 +631,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig TransactionState state = state(); if (state != COMMITTING) { - if (timedOut()) + if (remainingTime() == -1) throw new IgniteTxTimeoutCheckedException("Transaction timed out: " + this); setRollbackOnly(); @@ -3540,8 +3542,11 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig * @throws IgniteCheckedException If transaction check failed. */ protected void checkValid() throws IgniteCheckedException { + if (local() && !dht() && remainingTime() == -1) + state(MARKED_ROLLBACK, true); + if (isRollbackOnly()) { - if (timedOut()) + if (remainingTime() == -1) throw new IgniteTxTimeoutCheckedException("Cache transaction timed out: " + this); TransactionState state = state(); @@ -3556,10 +3561,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig throw new IgniteCheckedException("Cache transaction marked as rollback-only: " + this); } - - if (remainingTime() == -1 && setRollbackOnly()) - throw new IgniteTxTimeoutCheckedException("Cache transaction timed out " + - "(was rolled back automatically): " + this); } /** {@inheritDoc} */ @@ -3604,7 +3605,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig TransactionState state = state(); - assert state == TransactionState.ACTIVE || timedOut() : + assert state == TransactionState.ACTIVE || remainingTime() == -1 : "Invalid tx state for adding entry [op=" + op + ", val=" + val + ", entry=" + entry + ", filter=" + Arrays.toString(filter) + ", txCtx=" + cctx.tm().txContextVersion() + ", tx=" + this + ']';
