IGNITE-6181 wip.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/19383384 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/19383384 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/19383384 Branch: refs/heads/ignite-6181-1 Commit: 1938338407f68d19dea3255144cfcca01cf16ef7 Parents: 467e0ba Author: Aleksei Scherbakov <[email protected]> Authored: Mon Sep 11 13:08:25 2017 +0300 Committer: Aleksei Scherbakov <[email protected]> Committed: Mon Sep 11 13:08:25 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 5 -- .../cache/distributed/near/GridNearTxLocal.java | 15 ++-- .../cache/transactions/IgniteTxManager.java | 75 +++++++------------- 3 files changed, 33 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/19383384/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 8e075b2..fed716c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -102,7 +102,6 @@ import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFi import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; -import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -4031,10 +4030,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V awaitLastFut(); - if (ctx.tm().isTimedOutThread(Thread.currentThread().getId())) - throw new IgniteTxTimeoutCheckedException("Previous transaction was rolled back due to timeout. " + - "Please start new transaction and retry an operation."); - GridNearTxLocal tx = ctx.tm().threadLocalTx(ctx); if (tx == null || tx.implicit()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/19383384/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 3d93289..289096d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -2486,7 +2486,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou processLoaded(map, keys, needVer, c); return null; - } catch (Exception e) { + } + catch (Exception e) { setRollbackOnly(); throw new GridClosureException(e); @@ -3225,13 +3226,15 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou prepareFut.get(); fut0.finish(true); - } catch (Error | RuntimeException e) { + } + catch (Error | RuntimeException e) { COMMIT_ERR_UPD.compareAndSet(GridNearTxLocal.this, null, e); fut0.finish(false); throw e; - } catch (IgniteCheckedException e) { + } + catch (IgniteCheckedException e) { COMMIT_ERR_UPD.compareAndSet(GridNearTxLocal.this, null, e); if (!(e instanceof NodeStoppingException)) @@ -3276,9 +3279,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou return new GridFinishedFuture<>((IgniteInternalTx)this); } - if (timedOut()) - cctx.tm().markTimedOut(this); - GridNearTxFinishFuture fut = rollbackFut; if (fut != null) @@ -3694,6 +3694,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou /** {@inheritDoc} */ @Override public void close() throws IgniteCheckedException { + // If tx was rolled back asynchronously by timeout, user tx will not be cleared. + cctx.tm().resetUserTx(); + TransactionState state = state(); if (state != ROLLING_BACK && state != ROLLED_BACK && state != COMMITTING && state != COMMITTED) http://git-wip-us.apache.org/repos/asf/ignite/blob/19383384/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 41e10cb..fb04c2b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -29,8 +29,6 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListSet; - import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteClientDisconnectedException; import org.apache.ignite.IgniteSystemProperties; @@ -150,11 +148,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { /** Topology version should be used when mapping internal tx. */ private final ThreadLocal<AffinityTopologyVersion> txTop = new ThreadLocal<>(); - /** Per-thread transaction map. */ - private final ConcurrentMap<Long, IgniteInternalTx> threadMap = newMap(); - - /** Thread ids associated with rolled back transactions. */ - private final ConcurrentSkipListSet<Long> rolledBackByTimeoutThreads = new ConcurrentSkipListSet<>(); + /** User transaction. */ + private final static ThreadLocal<IgniteInternalTx> userTx = new ThreadLocal<>(); /** Per-thread system transaction map. */ private final ConcurrentMap<TxThreadKey, IgniteInternalTx> sysThreadMap = newMap(); @@ -287,19 +282,15 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * @param cacheId Cache ID. */ public void rollbackTransactionsForCache(int cacheId) { - rollbackTransactionsForCache(cacheId, nearIdMap); - - rollbackTransactionsForCache(cacheId, threadMap); + rollbackTransactionsForCache(cacheId, activeTransactions()); } /** * @param cacheId Cache ID. * @param txMap Transactions map. */ - private void rollbackTransactionsForCache(int cacheId, ConcurrentMap<?, IgniteInternalTx> txMap) { - for (Map.Entry<?, IgniteInternalTx> e : txMap.entrySet()) { - IgniteInternalTx tx = e.getValue(); - + private void rollbackTransactionsForCache(int cacheId, Collection<IgniteInternalTx> txMap) { + for (IgniteInternalTx tx : txMap) { for (IgniteTxEntry entry : tx.allEntries()) { if (entry.cacheId() == cacheId) { rollbackTx(tx); @@ -314,8 +305,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { @Override public void onDisconnected(IgniteFuture reconnectFut) { txFinishSync.onDisconnected(reconnectFut); - for (Map.Entry<Long, IgniteInternalTx> e : threadMap.entrySet()) - rollbackTx(e.getValue()); + for (IgniteInternalTx tx : activeTransactions()) + rollbackTx(tx); IgniteClientDisconnectedException err = new IgniteClientDisconnectedException(reconnectFut, "Client node disconnected."); @@ -374,8 +365,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { @Override public void printMemoryStats() { X.println(">>> "); X.println(">>> Transaction manager memory stats [igniteInstanceName=" + cctx.igniteInstanceName() + ']'); - X.println(">>> threadMapSize: " + threadMap.size()); - X.println(">>> idMap [size=" + idMap.size() + ']'); + X.println(">>> activeSize [size=" + activeTransactions().size() + ']'); X.println(">>> completedVersSortedSize: " + completedVersSorted.size()); X.println(">>> completedVersHashMapSize: " + completedVersHashMap.sizex()); } @@ -384,7 +374,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * @return Thread map size. */ public int threadMapSize() { - return threadMap.size(); + return 0; } /** @@ -493,7 +483,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { // and overwrite local transaction. if (tx.local() && !tx.dht()) { if (cacheCtx == null || !cacheCtx.systemTx()) - threadMap.put(tx.threadId(), tx); + userTx.set(tx); else sysThreadMap.put(new TxThreadKey(tx.threadId(), cacheCtx.cacheId()), tx); } @@ -670,7 +660,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * @return Not null topology version if current thread holds lock preventing topology change. */ @Nullable public AffinityTopologyVersion lockedTopologyVersion(long threadId, IgniteInternalTx ignore) { - IgniteInternalTx tx = threadMap.get(threadId); + IgniteInternalTx tx = userTx.get(); if (tx != null) { AffinityTopologyVersion topVer = tx.topologyVersionSnapshot(); @@ -777,7 +767,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { @SuppressWarnings({"unchecked"}) private <T> T tx(GridCacheContext cctx, long threadId) { if (cctx == null || !cctx.systemTx()) - return (T)threadMap.get(threadId); + return (T) userTx.get(); TxThreadKey key = new TxThreadKey(threadId, cctx.cacheId()); @@ -1428,7 +1418,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { private void clearThreadMap(IgniteInternalTx tx) { if (tx.local() && !tx.dht()) { if (!tx.system()) - threadMap.remove(tx.threadId(), tx); + userTx.set(null); else { Integer cacheId = tx.txState().firstCacheId(); @@ -1712,12 +1702,17 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { * Commit ended. */ public void resetContext() { - rolledBackByTimeoutThreads.remove(Thread.currentThread().getId()); - threadCtx.set(null); } /** + * Reset user tx. + */ + public void resetUserTx() { + userTx.set(null); + } + + /** * @return All transactions. */ public Collection<IgniteInternalTx> txs() { @@ -2291,35 +2286,13 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { + "[expected=" + SUSPENDED + ", actual=" + tx.state() + ']'); } - long threadId = Thread.currentThread().getId(); + if (userTx.get() != null) + throw new IgniteCheckedException("The thread already has active transaction."); - if (threadMap.putIfAbsent(threadId, tx) != null) - throw new IgniteCheckedException("Thread already has started a transaction."); + userTx.set(tx); if (transactionMap(tx).putIfAbsent(tx.xidVersion(), tx) != null) - throw new IgniteCheckedException("Thread already has started a transaction."); - - tx.threadId(threadId); - } - - /** - * Checks if thread belongs to timed out ids. - * - * @param threadId Thread id. - * @return {@code True} if current thread had a transaction rolled back by timeout. - */ - public boolean isTimedOutThread(long threadId) { - return rolledBackByTimeoutThreads.contains(threadId); - } - - /** - * Mark transaction thread as rolled back by timeout. - * Thread may not perform transactional ops until it will explicitly start a new transaction. - * - * @param tx Transaction. - */ - public void markTimedOut(GridNearTxLocal tx) { - rolledBackByTimeoutThreads.add(tx.threadId()); + throw new IgniteCheckedException("The thread already has active transaction."); } /**
