IGNITE-6181 wip.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/19383384
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/19383384
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/19383384

Branch: refs/heads/ignite-6181-1
Commit: 1938338407f68d19dea3255144cfcca01cf16ef7
Parents: 467e0ba
Author: Aleksei Scherbakov <[email protected]>
Authored: Mon Sep 11 13:08:25 2017 +0300
Committer: Aleksei Scherbakov <[email protected]>
Committed: Mon Sep 11 13:08:25 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      |  5 --
 .../cache/distributed/near/GridNearTxLocal.java | 15 ++--
 .../cache/transactions/IgniteTxManager.java     | 75 +++++++-------------
 3 files changed, 33 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/19383384/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 8e075b2..fed716c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -102,7 +102,6 @@ import 
org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFi
 import org.apache.ignite.internal.processors.task.GridInternal;
 import 
org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException;
 import 
org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
-import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
 import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -4031,10 +4030,6 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
 
         awaitLastFut();
 
-        if (ctx.tm().isTimedOutThread(Thread.currentThread().getId()))
-            throw new IgniteTxTimeoutCheckedException("Previous transaction 
was rolled back due to timeout. " +
-                    "Please start new transaction and retry an operation.");
-
         GridNearTxLocal tx = ctx.tm().threadLocalTx(ctx);
 
         if (tx == null || tx.implicit()) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/19383384/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 3d93289..289096d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -2486,7 +2486,8 @@ public class GridNearTxLocal extends 
GridDhtTxLocalAdapter implements GridTimeou
                         processLoaded(map, keys, needVer, c);
 
                         return null;
-                    } catch (Exception e) {
+                    }
+                    catch (Exception e) {
                         setRollbackOnly();
 
                         throw new GridClosureException(e);
@@ -3225,13 +3226,15 @@ public class GridNearTxLocal extends 
GridDhtTxLocalAdapter implements GridTimeou
                     prepareFut.get();
 
                     fut0.finish(true);
-                } catch (Error | RuntimeException e) {
+                }
+                catch (Error | RuntimeException e) {
                     COMMIT_ERR_UPD.compareAndSet(GridNearTxLocal.this, null, 
e);
 
                     fut0.finish(false);
 
                     throw e;
-                } catch (IgniteCheckedException e) {
+                }
+                catch (IgniteCheckedException e) {
                     COMMIT_ERR_UPD.compareAndSet(GridNearTxLocal.this, null, 
e);
 
                     if (!(e instanceof NodeStoppingException))
@@ -3276,9 +3279,6 @@ public class GridNearTxLocal extends 
GridDhtTxLocalAdapter implements GridTimeou
             return new GridFinishedFuture<>((IgniteInternalTx)this);
         }
 
-        if (timedOut())
-            cctx.tm().markTimedOut(this);
-
         GridNearTxFinishFuture fut = rollbackFut;
 
         if (fut != null)
@@ -3694,6 +3694,9 @@ public class GridNearTxLocal extends 
GridDhtTxLocalAdapter implements GridTimeou
 
     /** {@inheritDoc} */
     @Override public void close() throws IgniteCheckedException {
+        // If tx was rolled back asynchronously by timeout, user tx will not 
be cleared.
+        cctx.tm().resetUserTx();
+
         TransactionState state = state();
 
         if (state != ROLLING_BACK && state != ROLLED_BACK && state != 
COMMITTING && state != COMMITTED)

http://git-wip-us.apache.org/repos/asf/ignite/blob/19383384/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 41e10cb..fb04c2b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -29,8 +29,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListSet;
-
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.IgniteSystemProperties;
@@ -150,11 +148,8 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
     /** Topology version should be used when mapping internal tx. */
     private final ThreadLocal<AffinityTopologyVersion> txTop = new 
ThreadLocal<>();
 
-    /** Per-thread transaction map. */
-    private final ConcurrentMap<Long, IgniteInternalTx> threadMap = newMap();
-
-    /** Thread ids associated with rolled back transactions. */
-    private final ConcurrentSkipListSet<Long> rolledBackByTimeoutThreads = new 
ConcurrentSkipListSet<>();
+    /** User transaction. */
+    private final static ThreadLocal<IgniteInternalTx> userTx = new 
ThreadLocal<>();
 
     /** Per-thread system transaction map. */
     private final ConcurrentMap<TxThreadKey, IgniteInternalTx> sysThreadMap = 
newMap();
@@ -287,19 +282,15 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
      * @param cacheId Cache ID.
      */
     public void rollbackTransactionsForCache(int cacheId) {
-        rollbackTransactionsForCache(cacheId, nearIdMap);
-
-        rollbackTransactionsForCache(cacheId, threadMap);
+        rollbackTransactionsForCache(cacheId, activeTransactions());
     }
 
     /**
      * @param cacheId Cache ID.
      * @param txMap Transactions map.
      */
-    private void rollbackTransactionsForCache(int cacheId, ConcurrentMap<?, 
IgniteInternalTx> txMap) {
-        for (Map.Entry<?, IgniteInternalTx> e : txMap.entrySet()) {
-            IgniteInternalTx tx = e.getValue();
-
+    private void rollbackTransactionsForCache(int cacheId, 
Collection<IgniteInternalTx> txMap) {
+        for (IgniteInternalTx tx : txMap) {
             for (IgniteTxEntry entry : tx.allEntries()) {
                 if (entry.cacheId() == cacheId) {
                     rollbackTx(tx);
@@ -314,8 +305,8 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
     @Override public void onDisconnected(IgniteFuture reconnectFut) {
         txFinishSync.onDisconnected(reconnectFut);
 
-        for (Map.Entry<Long, IgniteInternalTx> e : threadMap.entrySet())
-            rollbackTx(e.getValue());
+        for (IgniteInternalTx tx : activeTransactions())
+            rollbackTx(tx);
 
         IgniteClientDisconnectedException err =
             new IgniteClientDisconnectedException(reconnectFut, "Client node 
disconnected.");
@@ -374,8 +365,7 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
     @Override public void printMemoryStats() {
         X.println(">>> ");
         X.println(">>> Transaction manager memory stats [igniteInstanceName=" 
+ cctx.igniteInstanceName() + ']');
-        X.println(">>>   threadMapSize: " + threadMap.size());
-        X.println(">>>   idMap [size=" + idMap.size() + ']');
+        X.println(">>>   activeSize [size=" + activeTransactions().size() + 
']');
         X.println(">>>   completedVersSortedSize: " + 
completedVersSorted.size());
         X.println(">>>   completedVersHashMapSize: " + 
completedVersHashMap.sizex());
     }
@@ -384,7 +374,7 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
      * @return Thread map size.
      */
     public int threadMapSize() {
-        return threadMap.size();
+        return 0;
     }
 
     /**
@@ -493,7 +483,7 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
             // and overwrite local transaction.
             if (tx.local() && !tx.dht()) {
                 if (cacheCtx == null || !cacheCtx.systemTx())
-                    threadMap.put(tx.threadId(), tx);
+                    userTx.set(tx);
                 else
                     sysThreadMap.put(new TxThreadKey(tx.threadId(), 
cacheCtx.cacheId()), tx);
             }
@@ -670,7 +660,7 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
      * @return Not null topology version if current thread holds lock 
preventing topology change.
      */
     @Nullable public AffinityTopologyVersion lockedTopologyVersion(long 
threadId, IgniteInternalTx ignore) {
-        IgniteInternalTx tx = threadMap.get(threadId);
+        IgniteInternalTx tx = userTx.get();
 
         if (tx != null) {
             AffinityTopologyVersion topVer = tx.topologyVersionSnapshot();
@@ -777,7 +767,7 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
     @SuppressWarnings({"unchecked"})
     private <T> T tx(GridCacheContext cctx, long threadId) {
         if (cctx == null || !cctx.systemTx())
-            return (T)threadMap.get(threadId);
+            return (T) userTx.get();
 
         TxThreadKey key = new TxThreadKey(threadId, cctx.cacheId());
 
@@ -1428,7 +1418,7 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
     private void clearThreadMap(IgniteInternalTx tx) {
         if (tx.local() && !tx.dht()) {
             if (!tx.system())
-                threadMap.remove(tx.threadId(), tx);
+                userTx.set(null);
             else {
                 Integer cacheId = tx.txState().firstCacheId();
 
@@ -1712,12 +1702,17 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
      * Commit ended.
      */
     public void resetContext() {
-        rolledBackByTimeoutThreads.remove(Thread.currentThread().getId());
-
         threadCtx.set(null);
     }
 
     /**
+     * Reset user tx.
+     */
+    public void resetUserTx() {
+        userTx.set(null);
+    }
+
+    /**
      * @return All transactions.
      */
     public Collection<IgniteInternalTx> txs() {
@@ -2291,35 +2286,13 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
                     + "[expected=" + SUSPENDED + ", actual=" + tx.state() + 
']');
         }
 
-        long threadId = Thread.currentThread().getId();
+        if (userTx.get() != null)
+            throw new IgniteCheckedException("The thread already has active 
transaction.");
 
-        if (threadMap.putIfAbsent(threadId, tx) != null)
-            throw new IgniteCheckedException("Thread already has started a 
transaction.");
+        userTx.set(tx);
 
         if (transactionMap(tx).putIfAbsent(tx.xidVersion(), tx) != null)
-            throw new IgniteCheckedException("Thread already has started a 
transaction.");
-
-        tx.threadId(threadId);
-    }
-
-    /**
-     * Checks if thread belongs to timed out ids.
-     *
-     * @param threadId Thread id.
-     * @return {@code True} if current thread had a transaction rolled back by 
timeout.
-     */
-    public boolean isTimedOutThread(long threadId) {
-        return rolledBackByTimeoutThreads.contains(threadId);
-    }
-
-    /**
-     * Mark transaction thread as rolled back by timeout.
-     * Thread may not perform transactional ops until it will explicitly start 
a new transaction.
-     *
-     * @param tx Transaction.
-     */
-    public void markTimedOut(GridNearTxLocal tx) {
-        rolledBackByTimeoutThreads.add(tx.threadId());
+            throw new IgniteCheckedException("The thread already has active 
transaction.");
     }
 
     /**

Reply via email to