IGNITE-6181 wip.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/12669b86
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/12669b86
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/12669b86

Branch: refs/heads/ignite-6181-1
Commit: 12669b86a0ea2e91ebff778ef9b1661b37ffc781
Parents: 8ab36ce
Author: ascherbakoff <[email protected]>
Authored: Sat Sep 9 16:04:08 2017 +0300
Committer: ascherbakoff <[email protected]>
Committed: Sat Sep 9 16:04:08 2017 +0300

----------------------------------------------------------------------
 .../IgniteDiagnosticPrepareContext.java         |  2 +-
 .../distributed/near/GridNearLockFuture.java    |  5 +-
 .../cache/distributed/near/GridNearTxLocal.java | 18 ++---
 .../transactions/IgniteTransactionsImpl.java    |  3 +-
 .../cache/transactions/IgniteTxManager.java     | 77 ++++++++------------
 5 files changed, 38 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/12669b86/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticPrepareContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticPrepareContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticPrepareContext.java
index 378dc74..14783d5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticPrepareContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticPrepareContext.java
@@ -75,7 +75,7 @@ public class IgniteDiagnosticPrepareContext {
      * @param keys Entry keys.
      * @param msg Initial message.
      */
-    public void  txKeyInfo(UUID nodeId, int cacheId, 
Collection<KeyCacheObject> keys, String msg) {
+    public void txKeyInfo(UUID nodeId, int cacheId, Collection<KeyCacheObject> 
keys, String msg) {
         closure(nodeId).add(msg, new TxEntriesInfoClosure(cacheId, keys));
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/12669b86/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
index bb71337..9cad49e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java
@@ -770,10 +770,7 @@ public final class GridNearLockFuture extends 
GridCacheCompoundIdentityFuture<Bo
      * part. Note that if primary node leaves grid, the future will fail and 
transaction will be rolled back.
      */
     void map() {
-        // Obtain the topology version to use.
-        long threadId = Thread.currentThread().getId();
-
-        AffinityTopologyVersion topVer = 
cctx.mvcc().lastExplicitLockTopologyVersion(threadId);
+        AffinityTopologyVersion topVer = 
cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
 
         // If there is another system transaction in progress, use it's 
topology version to prevent deadlock.
         if (topVer == null && tx != null && tx.system())

http://git-wip-us.apache.org/repos/asf/ignite/blob/12669b86/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index 0c76aec..8d15e87 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -220,7 +220,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter 
implements GridTimeou
             plc,
             concurrency,
             isolation,
-            timeout == 0 ? 0 : Math.max(100, timeout),
+            timeout,
             false,
             storeEnabled,
             false,
@@ -230,7 +230,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter 
implements GridTimeou
 
         mappings = implicitSingle ? new IgniteTxMappingsSingleImpl() : new 
IgniteTxMappingsImpl();
 
-        if (this.timeout > 0)
+        if (this.timeout() > 0 && !implicit())
             cctx.time().addTimeoutObject(this);
 
         initResult();
@@ -3151,7 +3151,7 @@ public class GridNearTxLocal extends 
GridDhtTxLocalAdapter implements GridTimeou
             // Prepare was called explicitly.
             return fut;
 
-        if (timeout() > 0)
+        if (timeout() > 0 && !implicit())
             cctx.time().removeTimeoutObject(this);
 
         mapExplicitLocks();
@@ -3266,7 +3266,7 @@ public class GridNearTxLocal extends 
GridDhtTxLocalAdapter implements GridTimeou
         if (log.isDebugEnabled())
             log.debug("Rolling back near tx: " + this);
 
-        if (remainingTime() > 0)
+        if (remainingTime() > 0 && !implicit())
             cctx.time().removeTimeoutObject(this);
 
         if (fastFinish()) {
@@ -3701,7 +3701,7 @@ public class GridNearTxLocal extends 
GridDhtTxLocalAdapter implements GridTimeou
         if (state != ROLLING_BACK && state != ROLLED_BACK && state != 
COMMITTING && state != COMMITTED)
             rollback();
 
-        cctx.tm().onLocalClose(threadId(), this);
+        cctx.tm().onLocalClose();
 
         synchronized (this) {
             try {
@@ -4006,14 +4006,6 @@ public class GridNearTxLocal extends 
GridDhtTxLocalAdapter implements GridTimeou
     }
 
     /**
-     * @param threadId new owner of transaction.
-     * @throws IgniteCheckedException if method executed not in the middle of 
resume or suspend.
-     */
-    public void threadId(long threadId) {
-        this.threadId = threadId;
-    }
-
-    /**
      * Post-lock closure.
      *
      * @param <T> Return type.

http://git-wip-us.apache.org/repos/asf/ignite/blob/12669b86/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
index 9891484..df5c90e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java
@@ -155,8 +155,7 @@ public class IgniteTransactionsImpl<K, V> implements 
IgniteTransactionsEx {
         try {
             GridNearTxLocal tx = cctx.tm().userTx(sysCacheCtx);
 
-            // Allow to start new transaction if previous transaction was 
rolled back by timeout.
-            if (tx != null && !(tx.state() == TransactionState.ROLLED_BACK && 
tx.timedOut()))
+            if (tx != null)
                 throw new IllegalStateException("Failed to start new 
transaction " +
                     "(current thread already has a transaction): " + tx);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/12669b86/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 7ff36ac..d65832c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -148,8 +148,8 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
     /** Topology version should be used when mapping internal tx. */
     private final ThreadLocal<AffinityTopologyVersion> txTop = new 
ThreadLocal<>();
 
-    /** Per-thread transaction map. */
-    private final ConcurrentMap<Long, IgniteInternalTx> threadMap = newMap();
+    /** User transaction. */
+    private final static ThreadLocal<IgniteInternalTx> userTx = new 
ThreadLocal<>();
 
     /** Per-thread system transaction map. */
     private final ConcurrentMap<TxThreadKey, IgniteInternalTx> sysThreadMap = 
newMap();
@@ -282,19 +282,17 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
      * @param cacheId Cache ID.
      */
     public void rollbackTransactionsForCache(int cacheId) {
-        rollbackTransactionsForCache(cacheId, nearIdMap);
+        rollbackTransactionsForCache(cacheId, nearIdMap.values());
 
-        rollbackTransactionsForCache(cacheId, threadMap);
+        rollbackTransactionsForCache(cacheId, 
Collections.singleton(userTx.get()));
     }
 
     /**
      * @param cacheId Cache ID.
      * @param txMap Transactions map.
      */
-    private void rollbackTransactionsForCache(int cacheId, ConcurrentMap<?, 
IgniteInternalTx> txMap) {
-        for (Map.Entry<?, IgniteInternalTx> e : txMap.entrySet()) {
-            IgniteInternalTx tx = e.getValue();
-
+    private void rollbackTransactionsForCache(int cacheId, 
Collection<IgniteInternalTx> txMap) {
+        for (IgniteInternalTx tx : txMap) {
             for (IgniteTxEntry entry : tx.allEntries()) {
                 if (entry.cacheId() == cacheId) {
                     rollbackTx(tx);
@@ -309,8 +307,8 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
     @Override public void onDisconnected(IgniteFuture reconnectFut) {
         txFinishSync.onDisconnected(reconnectFut);
 
-        for (Map.Entry<Long, IgniteInternalTx> e : threadMap.entrySet())
-            rollbackTx(e.getValue());
+        for (IgniteInternalTx tx : activeTransactions())
+            rollbackTx(tx);
 
         IgniteClientDisconnectedException err =
             new IgniteClientDisconnectedException(reconnectFut, "Client node 
disconnected.");
@@ -369,8 +367,7 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
     @Override public void printMemoryStats() {
         X.println(">>> ");
         X.println(">>> Transaction manager memory stats [igniteInstanceName=" 
+ cctx.igniteInstanceName() + ']');
-        X.println(">>>   threadMapSize: " + threadMap.size());
-        X.println(">>>   idMap [size=" + idMap.size() + ']');
+        X.println(">>>   activeSize [size=" + activeTransactions().size() + 
']');
         X.println(">>>   completedVersSortedSize: " + 
completedVersSorted.size());
         X.println(">>>   completedVersHashMapSize: " + 
completedVersHashMap.sizex());
     }
@@ -379,7 +376,7 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
      * @return Thread map size.
      */
     public int threadMapSize() {
-        return threadMap.size();
+        return 0;
     }
 
     /**
@@ -488,7 +485,7 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
             // and overwrite local transaction.
             if (tx.local() && !tx.dht()) {
                 if (cacheCtx == null || !cacheCtx.systemTx())
-                    threadMap.put(tx.threadId(), tx);
+                    userTx.set(tx);
                 else
                     sysThreadMap.put(new TxThreadKey(tx.threadId(), 
cacheCtx.cacheId()), tx);
             }
@@ -665,7 +662,7 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
      * @return Not null topology version if current thread holds lock 
preventing topology change.
      */
     @Nullable public AffinityTopologyVersion lockedTopologyVersion(long 
threadId, IgniteInternalTx ignore) {
-        IgniteInternalTx tx = threadMap.get(threadId);
+        IgniteInternalTx tx = userTx.get();
 
         if (tx != null) {
             AffinityTopologyVersion topVer = tx.topologyVersionSnapshot();
@@ -772,7 +769,7 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
     @SuppressWarnings({"unchecked"})
     private <T> T tx(GridCacheContext cctx, long threadId) {
         if (cctx == null || !cctx.systemTx())
-            return (T)threadMap.get(threadId);
+            return (T) userTx.get();
 
         TxThreadKey key = new TxThreadKey(threadId, cctx.cacheId());
 
@@ -1421,26 +1418,19 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
      * @param tx Transaction to clear.
      */
     private void clearThreadMap(IgniteInternalTx tx) {
-        if (tx.local() && !tx.dht()) {
-            if (!tx.system()) {
-                /** Timed out local transactions are cleared in {@link 
#onLocalClose}. */
-                if (!tx.timedOut() || !tx.pessimistic())
-                    threadMap.remove(tx.threadId(), tx);
-            }
-            else {
-                Integer cacheId = tx.txState().firstCacheId();
+        if (tx.local() && !tx.dht() && tx.system()) {
+            Integer cacheId = tx.txState().firstCacheId();
 
-                if (cacheId != null)
-                    sysThreadMap.remove(new TxThreadKey(tx.threadId(), 
cacheId), tx);
-                else {
-                    for (Iterator<IgniteInternalTx> it = 
sysThreadMap.values().iterator(); it.hasNext(); ) {
-                        IgniteInternalTx txx = it.next();
+            if (cacheId != null)
+                sysThreadMap.remove(new TxThreadKey(tx.threadId(), cacheId), 
tx);
+            else {
+                for (Iterator<IgniteInternalTx> it = 
sysThreadMap.values().iterator(); it.hasNext(); ) {
+                    IgniteInternalTx txx = it.next();
 
-                        if (tx == txx) {
-                            it.remove();
+                    if (tx == txx) {
+                        it.remove();
 
-                            break;
-                        }
+                        break;
                     }
                 }
             }
@@ -2279,7 +2269,6 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
      */
     public void resumeTx(GridNearTxLocal tx) throws IgniteCheckedException {
         assert tx != null && !tx.system() : tx;
-        assert !threadMap.containsValue(tx) : tx;
         assert !transactionMap(tx).containsValue(tx) : tx;
         assert !haveSystemTxForThread(Thread.currentThread().getId());
 
@@ -2288,15 +2277,13 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
                 + "[expected=" + SUSPENDED + ", actual=" + tx.state() + ']');
         }
 
-        long threadId = Thread.currentThread().getId();
+        if (userTx.get() != null)
+            throw new IgniteCheckedException("Thread already has started 
transaction.");
 
-        if (threadMap.putIfAbsent(threadId, tx) != null)
-            throw new IgniteCheckedException("Thread already start a 
transaction.");
+        userTx.set(tx);
 
         if (transactionMap(tx).putIfAbsent(tx.xidVersion(), tx) != null)
-            throw new IgniteCheckedException("Thread already start a 
transaction.");
-
-        tx.threadId(threadId);
+            throw new IgniteCheckedException("Thread already has started 
transaction.");
     }
 
     /**
@@ -2317,13 +2304,9 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
         return false;
     }
 
-    /**
-     * Callback for closing local transaction.
-     * @param threadId Thread ID.
-     * @param tx Local transaction.
-     */
-    public void onLocalClose(long threadId, GridNearTxLocal tx) {
-        threadMap.remove(threadId, tx);
+    /** */
+    public void onLocalClose() {
+        userTx.set(null);
     }
 
     /**

Reply via email to