IGNITE-6181 wip.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/12669b86 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/12669b86 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/12669b86 Branch: refs/heads/ignite-6181-1 Commit: 12669b86a0ea2e91ebff778ef9b1661b37ffc781 Parents: 8ab36ce Author: ascherbakoff <[email protected]> Authored: Sat Sep 9 16:04:08 2017 +0300 Committer: ascherbakoff <[email protected]> Committed: Sat Sep 9 16:04:08 2017 +0300 ---------------------------------------------------------------------- .../IgniteDiagnosticPrepareContext.java | 2 +- .../distributed/near/GridNearLockFuture.java | 5 +- .../cache/distributed/near/GridNearTxLocal.java | 18 ++--- .../transactions/IgniteTransactionsImpl.java | 3 +- .../cache/transactions/IgniteTxManager.java | 77 ++++++++------------ 5 files changed, 38 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/12669b86/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticPrepareContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticPrepareContext.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticPrepareContext.java index 378dc74..14783d5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticPrepareContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticPrepareContext.java @@ -75,7 +75,7 @@ public class IgniteDiagnosticPrepareContext { * @param keys Entry keys. * @param msg Initial message. */ - public void txKeyInfo(UUID nodeId, int cacheId, Collection<KeyCacheObject> keys, String msg) { + public void txKeyInfo(UUID nodeId, int cacheId, Collection<KeyCacheObject> keys, String msg) { closure(nodeId).add(msg, new TxEntriesInfoClosure(cacheId, keys)); } http://git-wip-us.apache.org/repos/asf/ignite/blob/12669b86/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index bb71337..9cad49e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -770,10 +770,7 @@ public final class GridNearLockFuture extends GridCacheCompoundIdentityFuture<Bo * part. Note that if primary node leaves grid, the future will fail and transaction will be rolled back. */ void map() { - // Obtain the topology version to use. - long threadId = Thread.currentThread().getId(); - - AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId); + AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId()); // If there is another system transaction in progress, use it's topology version to prevent deadlock. if (topVer == null && tx != null && tx.system()) http://git-wip-us.apache.org/repos/asf/ignite/blob/12669b86/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 0c76aec..8d15e87 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -220,7 +220,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou plc, concurrency, isolation, - timeout == 0 ? 0 : Math.max(100, timeout), + timeout, false, storeEnabled, false, @@ -230,7 +230,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou mappings = implicitSingle ? new IgniteTxMappingsSingleImpl() : new IgniteTxMappingsImpl(); - if (this.timeout > 0) + if (this.timeout() > 0 && !implicit()) cctx.time().addTimeoutObject(this); initResult(); @@ -3151,7 +3151,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou // Prepare was called explicitly. return fut; - if (timeout() > 0) + if (timeout() > 0 && !implicit()) cctx.time().removeTimeoutObject(this); mapExplicitLocks(); @@ -3266,7 +3266,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou if (log.isDebugEnabled()) log.debug("Rolling back near tx: " + this); - if (remainingTime() > 0) + if (remainingTime() > 0 && !implicit()) cctx.time().removeTimeoutObject(this); if (fastFinish()) { @@ -3701,7 +3701,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou if (state != ROLLING_BACK && state != ROLLED_BACK && state != COMMITTING && state != COMMITTED) rollback(); - cctx.tm().onLocalClose(threadId(), this); + cctx.tm().onLocalClose(); synchronized (this) { try { @@ -4006,14 +4006,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou } /** - * @param threadId new owner of transaction. - * @throws IgniteCheckedException if method executed not in the middle of resume or suspend. - */ - public void threadId(long threadId) { - this.threadId = threadId; - } - - /** * Post-lock closure. * * @param <T> Return type. http://git-wip-us.apache.org/repos/asf/ignite/blob/12669b86/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java index 9891484..df5c90e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java @@ -155,8 +155,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx { try { GridNearTxLocal tx = cctx.tm().userTx(sysCacheCtx); - // Allow to start new transaction if previous transaction was rolled back by timeout. - if (tx != null && !(tx.state() == TransactionState.ROLLED_BACK && tx.timedOut())) + if (tx != null) throw new IllegalStateException("Failed to start new transaction " + "(current thread already has a transaction): " + tx); http://git-wip-us.apache.org/repos/asf/ignite/blob/12669b86/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 7ff36ac..d65832c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -148,8 +148,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { /** Topology version should be used when mapping internal tx. */ private final ThreadLocal<AffinityTopologyVersion> txTop = new ThreadLocal<>(); - /** Per-thread transaction map. */ - private final ConcurrentMap<Long, IgniteInternalTx> threadMap = newMap(); + /** User transaction. */ + private final static ThreadLocal<IgniteInternalTx> userTx = new ThreadLocal<>(); /** Per-thread system transaction map. */ private final ConcurrentMap<TxThreadKey, IgniteInternalTx> sysThreadMap = newMap(); @@ -282,19 +282,17 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * @param cacheId Cache ID. */ public void rollbackTransactionsForCache(int cacheId) { - rollbackTransactionsForCache(cacheId, nearIdMap); + rollbackTransactionsForCache(cacheId, nearIdMap.values()); - rollbackTransactionsForCache(cacheId, threadMap); + rollbackTransactionsForCache(cacheId, Collections.singleton(userTx.get())); } /** * @param cacheId Cache ID. * @param txMap Transactions map. */ - private void rollbackTransactionsForCache(int cacheId, ConcurrentMap<?, IgniteInternalTx> txMap) { - for (Map.Entry<?, IgniteInternalTx> e : txMap.entrySet()) { - IgniteInternalTx tx = e.getValue(); - + private void rollbackTransactionsForCache(int cacheId, Collection<IgniteInternalTx> txMap) { + for (IgniteInternalTx tx : txMap) { for (IgniteTxEntry entry : tx.allEntries()) { if (entry.cacheId() == cacheId) { rollbackTx(tx); @@ -309,8 +307,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { @Override public void onDisconnected(IgniteFuture reconnectFut) { txFinishSync.onDisconnected(reconnectFut); - for (Map.Entry<Long, IgniteInternalTx> e : threadMap.entrySet()) - rollbackTx(e.getValue()); + for (IgniteInternalTx tx : activeTransactions()) + rollbackTx(tx); IgniteClientDisconnectedException err = new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected."); @@ -369,8 +367,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { @Override public void printMemoryStats() { X.println(">>> "); X.println(">>> Transaction manager memory stats [igniteInstanceName=" + cctx.igniteInstanceName() + ']'); - X.println(">>> threadMapSize: " + threadMap.size()); - X.println(">>> idMap [size=" + idMap.size() + ']'); + X.println(">>> activeSize [size=" + activeTransactions().size() + ']'); X.println(">>> completedVersSortedSize: " + completedVersSorted.size()); X.println(">>> completedVersHashMapSize: " + completedVersHashMap.sizex()); } @@ -379,7 +376,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * @return Thread map size. */ public int threadMapSize() { - return threadMap.size(); + return 0; } /** @@ -488,7 +485,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { // and overwrite local transaction. if (tx.local() && !tx.dht()) { if (cacheCtx == null || !cacheCtx.systemTx()) - threadMap.put(tx.threadId(), tx); + userTx.set(tx); else sysThreadMap.put(new TxThreadKey(tx.threadId(), cacheCtx.cacheId()), tx); } @@ -665,7 +662,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * @return Not null topology version if current thread holds lock preventing topology change. */ @Nullable public AffinityTopologyVersion lockedTopologyVersion(long threadId, IgniteInternalTx ignore) { - IgniteInternalTx tx = threadMap.get(threadId); + IgniteInternalTx tx = userTx.get(); if (tx != null) { AffinityTopologyVersion topVer = tx.topologyVersionSnapshot(); @@ -772,7 +769,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { @SuppressWarnings({"unchecked"}) private <T> T tx(GridCacheContext cctx, long threadId) { if (cctx == null || !cctx.systemTx()) - return (T)threadMap.get(threadId); + return (T) userTx.get(); TxThreadKey key = new TxThreadKey(threadId, cctx.cacheId()); @@ -1421,26 +1418,19 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * @param tx Transaction to clear. */ private void clearThreadMap(IgniteInternalTx tx) { - if (tx.local() && !tx.dht()) { - if (!tx.system()) { - /** Timed out local transactions are cleared in {@link #onLocalClose}. */ - if (!tx.timedOut() || !tx.pessimistic()) - threadMap.remove(tx.threadId(), tx); - } - else { - Integer cacheId = tx.txState().firstCacheId(); + if (tx.local() && !tx.dht() && tx.system()) { + Integer cacheId = tx.txState().firstCacheId(); - if (cacheId != null) - sysThreadMap.remove(new TxThreadKey(tx.threadId(), cacheId), tx); - else { - for (Iterator<IgniteInternalTx> it = sysThreadMap.values().iterator(); it.hasNext(); ) { - IgniteInternalTx txx = it.next(); + if (cacheId != null) + sysThreadMap.remove(new TxThreadKey(tx.threadId(), cacheId), tx); + else { + for (Iterator<IgniteInternalTx> it = sysThreadMap.values().iterator(); it.hasNext(); ) { + IgniteInternalTx txx = it.next(); - if (tx == txx) { - it.remove(); + if (tx == txx) { + it.remove(); - break; - } + break; } } } @@ -2279,7 +2269,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { */ public void resumeTx(GridNearTxLocal tx) throws IgniteCheckedException { assert tx != null && !tx.system() : tx; - assert !threadMap.containsValue(tx) : tx; assert !transactionMap(tx).containsValue(tx) : tx; assert !haveSystemTxForThread(Thread.currentThread().getId()); @@ -2288,15 +2277,13 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { + "[expected=" + SUSPENDED + ", actual=" + tx.state() + ']'); } - long threadId = Thread.currentThread().getId(); + if (userTx.get() != null) + throw new IgniteCheckedException("Thread already has started transaction."); - if (threadMap.putIfAbsent(threadId, tx) != null) - throw new IgniteCheckedException("Thread already start a transaction."); + userTx.set(tx); if (transactionMap(tx).putIfAbsent(tx.xidVersion(), tx) != null) - throw new IgniteCheckedException("Thread already start a transaction."); - - tx.threadId(threadId); + throw new IgniteCheckedException("Thread already has started transaction."); } /** @@ -2317,13 +2304,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { return false; } - /** - * Callback for closing local transaction. - * @param threadId Thread ID. - * @param tx Local transaction. - */ - public void onLocalClose(long threadId, GridNearTxLocal tx) { - threadMap.remove(threadId, tx); + /** */ + public void onLocalClose() { + userTx.set(null); } /**
