Repository: ignite
Updated Branches:
  refs/heads/ignite-single-op-tx f09d09fe5 -> dded8563d


'Single' operations optimizations for tx cache.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dded8563
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dded8563
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dded8563

Branch: refs/heads/ignite-single-op-tx
Commit: dded8563db0030d016669767f9afbe9199b158dd
Parents: f09d09f
Author: sboikov <sboi...@gridgain.com>
Authored: Fri Nov 13 16:31:10 2015 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Fri Nov 13 16:31:10 2015 +0300

----------------------------------------------------------------------
 .../cache/GridCacheSharedContext.java           |  30 +--
 ...arOptimisticSerializableTxPrepareFuture.java |  13 -
 .../near/GridNearOptimisticTxPrepareFuture.java |  13 -
 ...ridNearOptimisticTxPrepareFutureAdapter.java |  70 +----
 .../near/GridNearTxFinishFuture.java            |  12 +-
 .../cache/distributed/near/GridNearTxLocal.java |  10 +-
 .../cache/transactions/IgniteInternalTx.java    |   5 +
 .../cache/transactions/IgniteTxAdapter.java     |  64 ++---
 .../transactions/IgniteTxImplicitStateImpl.java | 151 +++++++++++
 .../transactions/IgniteTxLocalAdapter.java      |  45 +---
 .../cache/transactions/IgniteTxManager.java     |  18 +-
 .../cache/transactions/IgniteTxState.java       |  56 ++++
 .../transactions/IgniteTxStateAdapter.java      |  41 +++
 .../cache/transactions/IgniteTxStateImpl.java   | 256 +++++++++++++++++++
 14 files changed, 553 insertions(+), 231 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/dded8563/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index b37742c..5321bb3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -582,12 +582,7 @@ public class GridCacheSharedContext<K, V> {
      * @throws IgniteCheckedException If failed.
      */
     public void endTx(IgniteInternalTx tx) throws IgniteCheckedException {
-        Collection<Integer> cacheIds = tx.activeCacheIds();
-
-        if (!cacheIds.isEmpty()) {
-            for (Integer cacheId : cacheIds)
-                cacheContext(cacheId).cache().awaitLastFut();
-        }
+        tx.txState().awaitLastFut(this);
 
         tx.close();
     }
@@ -597,21 +592,9 @@ public class GridCacheSharedContext<K, V> {
      * @return Commit future.
      */
     public IgniteInternalFuture<IgniteInternalTx> 
commitTxAsync(IgniteInternalTx tx) {
-        Collection<Integer> cacheIds = tx.activeCacheIds();
-
-        if (cacheIds.isEmpty())
-            return tx.commitAsync();
-        else if (cacheIds.size() == 1) {
-            int cacheId = F.first(cacheIds);
-
-            return cacheContext(cacheId).cache().commitTxAsync(tx);
-        }
-        else {
-            for (Integer cacheId : cacheIds)
-                cacheContext(cacheId).cache().awaitLastFut();
+        tx.txState().awaitLastFut(this);
 
-            return tx.commitAsync();
-        }
+        return tx.commitAsync();
     }
 
     /**
@@ -620,12 +603,7 @@ public class GridCacheSharedContext<K, V> {
      * @return Rollback future.
      */
     public IgniteInternalFuture rollbackTxAsync(IgniteInternalTx tx) throws 
IgniteCheckedException {
-        Collection<Integer> cacheIds = tx.activeCacheIds();
-
-        if (!cacheIds.isEmpty()) {
-            for (Integer cacheId : cacheIds)
-                cacheContext(cacheId).cache().awaitLastFut();
-        }
+        tx.txState().awaitLastFut(this);
 
         return tx.rollbackAsync();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/dded8563/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 5488bb1..0ff8eae 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -326,19 +326,6 @@ public class GridNearOptimisticSerializableTxPrepareFuture 
extends GridNearOptim
 
         txMapping = new GridDhtTxMapping();
 
-        if (!F.isEmpty(reads) || !F.isEmpty(writes)) {
-            for (int cacheId : tx.activeCacheIds()) {
-                GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
-
-                if (CU.affinityNodes(cacheCtx, topVer).isEmpty()) {
-                    onDone(new ClusterTopologyServerNotFoundException("Failed 
to map keys for cache (all " +
-                        "partition nodes left the grid): " + cacheCtx.name()));
-
-                    return;
-                }
-            }
-        }
-
         Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> 
mappings = new HashMap<>();
 
         for (IgniteTxEntry write : writes)

http://git-wip-us.apache.org/repos/asf/ignite/blob/dded8563/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 0002180..2a6d940 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -296,19 +296,6 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearOptimisticTxPrepa
 
         Queue<GridDistributedTxMapping> mappings = new ArrayDeque<>();
 
-        if (!F.isEmpty(writes)) {
-            for (int cacheId : tx.activeCacheIds()) {
-                GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
-
-                if (CU.affinityNodes(cacheCtx, topVer).isEmpty()) {
-                    onDone(new ClusterTopologyServerNotFoundException("Failed 
to map keys for cache (all " +
-                        "partition nodes left the grid): " + cacheCtx.name()));
-
-                    return;
-                }
-            }
-        }
-
         // Assign keys to primary nodes.
         GridDistributedTxMapping cur = null;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/dded8563/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
index fd9183e..92fbfec 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
@@ -75,56 +75,14 @@ public abstract class 
GridNearOptimisticTxPrepareFutureAdapter extends GridNearT
      * @return Topology ready future.
      */
     protected final GridDhtTopologyFuture topologyReadLock() {
-        if (tx.activeCacheIds().isEmpty())
-            return cctx.exchange().lastTopologyFuture();
-
-        GridCacheContext<?, ?> nonLocCtx = null;
-
-        for (int cacheId : tx.activeCacheIds()) {
-            GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
-
-            if (!cacheCtx.isLocal()) {
-                nonLocCtx = cacheCtx;
-
-                break;
-            }
-        }
-
-        if (nonLocCtx == null)
-            return cctx.exchange().lastTopologyFuture();
-
-        nonLocCtx.topology().readLock();
-
-        if (nonLocCtx.topology().stopping()) {
-            onDone(new IgniteCheckedException("Failed to perform cache 
operation (cache is stopped): " +
-                nonLocCtx.name()));
-
-            return null;
-        }
-
-        return nonLocCtx.topology().topologyVersionFuture();
+        return tx.txState().topologyReadLock(cctx, this);
     }
 
     /**
      * Releases topology read lock.
      */
     protected final void topologyReadUnlock() {
-        if (!tx.activeCacheIds().isEmpty()) {
-            GridCacheContext<?, ?> nonLocCtx = null;
-
-            for (int cacheId : tx.activeCacheIds()) {
-                GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
-
-                if (!cacheCtx.isLocal()) {
-                    nonLocCtx = cacheCtx;
-
-                    break;
-                }
-            }
-
-            if (nonLocCtx != null)
-                nonLocCtx.topology().readUnlock();
-        }
+        tx.txState().topologyReadUnlock(cctx);
     }
 
     /**
@@ -160,28 +118,10 @@ public abstract class 
GridNearOptimisticTxPrepareFutureAdapter extends GridNearT
         }
 
         if (topVer != null) {
-            StringBuilder invalidCaches = null;
-
-            for (Integer cacheId : tx.activeCacheIds()) {
-                GridCacheContext ctx = cctx.cacheContext(cacheId);
-
-                assert ctx != null : cacheId;
-
-                Throwable err = topFut.validateCache(ctx);
-
-                if (err != null) {
-                    if (invalidCaches != null)
-                        invalidCaches.append(", ");
-                    else
-                        invalidCaches = new StringBuilder();
-
-                    invalidCaches.append(U.maskName(ctx.name()));
-                }
-            }
+            IgniteCheckedException err = tx.txState().validateTopology(cctx, 
topFut);
 
-            if (invalidCaches != null) {
-                onDone(new IgniteCheckedException("Failed to perform cache 
operation (cache topology is not valid): " +
-                    invalidCaches.toString()));
+            if (err != null) {
+                onDone(err);
 
                 return;
             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/dded8563/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
index a9dbda2..7b1ff05 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java
@@ -487,17 +487,7 @@ public final class GridNearTxFinishFuture<K, V> extends 
GridCompoundIdentityFutu
 
         assert tx.mappings().size() == 1;
 
-        boolean finish = false;
-
-        for (Integer cacheId : tx.activeCacheIds()) {
-            GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId);
-
-            if (cacheCtx.isNear()) {
-                finish = true;
-
-                break;
-            }
-        }
+        boolean finish = tx.txState().hasNearCache(cctx);
 
         if (finish) {
             GridDistributedTxMapping mapping = F.first(tx.mappings().values());

http://git-wip-us.apache.org/repos/asf/ignite/blob/dded8563/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index db4a4b8..472a081 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -280,15 +280,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter 
{
      * @return {@code True} if transaction is fully synchronous.
      */
     private boolean sync() {
-        if (super.syncCommit())
-            return true;
-
-        for (int cacheId : activeCacheIds()) {
-            if 
(cctx.cacheContext(cacheId).config().getWriteSynchronizationMode() == FULL_SYNC)
-                return true;
-        }
-
-        return false;
+        return super.syncCommit() || txState().sync(cctx);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/dded8563/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
index 94af6bb..7819ed3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java
@@ -280,6 +280,11 @@ public interface IgniteInternalTx extends AutoCloseable, 
GridTimeoutObject {
     public Collection<Integer> activeCacheIds();
 
     /**
+     * @return Transaction state.
+     */
+    public IgniteTxState txState();
+
+    /**
      * @return {@code true} or {@code false} if the deployment is enabled or 
disabled for all active caches involved
      * in this transaction.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/dded8563/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index eb2ca2c..ab400eb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -120,10 +120,6 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter
     @GridToStringInclude
     protected boolean implicit;
 
-    /** Implicit with one key flag. */
-    @GridToStringInclude
-    protected boolean implicitSingle;
-
     /** Local flag. */
     @GridToStringInclude
     protected boolean loc;
@@ -251,6 +247,9 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter
     @GridToStringExclude
     private TransactionProxyImpl proxy;
 
+    /** */
+    protected IgniteTxState txState;
+
     /**
      * Empty constructor required for {@link Externalizable}.
      */
@@ -295,7 +294,6 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter
         this.cctx = cctx;
         this.xidVer = xidVer;
         this.implicit = implicit;
-        this.implicitSingle = implicitSingle;
         this.loc = loc;
         this.sys = sys;
         this.plc = plc;
@@ -315,6 +313,8 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter
 
         threadId = Thread.currentThread().getId();
 
+        txState = implicitSingle ? new IgniteTxImplicitStateImpl() : new 
IgniteTxStateImpl();
+
         if (log == null)
             log = U.logger(cctx.kernalContext(), logRef, this);
     }
@@ -362,14 +362,20 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter
         this.taskNameHash = taskNameHash;
 
         implicit = false;
-        implicitSingle = false;
         loc = false;
 
+        txState = new IgniteTxStateImpl();
+
         if (log == null)
             log = U.logger(cctx.kernalContext(), logRef, this);
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteTxState txState() {
+        return txState;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean localResult() {
         assert originatingNodeId() != null;
 
@@ -421,45 +427,8 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter
 
     /** {@inheritDoc} */
     @Override public boolean storeUsed() {
-        if (!storeEnabled())
-            return false;
-
-        Collection<Integer> cacheIds = activeCacheIds();
-
-        if (!cacheIds.isEmpty()) {
-            for (int cacheId : cacheIds) {
-                CacheStoreManager store = cctx.cacheContext(cacheId).store();
-
-                if (store.configured())
-                    return true;
-            }
-        }
+        return storeEnabled() && txState.storeUsed(cctx);
 
-        return false;
-    }
-
-    /**
-     * Store manager for current transaction.
-     *
-     * @return Store manager.
-     */
-    protected Collection<CacheStoreManager> stores() {
-        Collection<Integer> cacheIds = activeCacheIds();
-
-        if (!cacheIds.isEmpty()) {
-            Collection<CacheStoreManager> stores = new 
ArrayList<>(cacheIds.size());
-
-            for (int cacheId : cacheIds) {
-                CacheStoreManager store = cctx.cacheContext(cacheId).store();
-
-                if (store.configured())
-                    stores.add(store);
-            }
-
-            return stores;
-        }
-
-        return null;
     }
 
     /**
@@ -645,7 +614,7 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter
 
     /** {@inheritDoc} */
     @Override public boolean implicitSingle() {
-        return implicitSingle;
+        return txState.implicitSingle();
     }
 
     /** {@inheritDoc} */
@@ -1877,6 +1846,11 @@ public abstract class IgniteTxAdapter extends 
GridMetadataAwareAdapter
         }
 
         /** {@inheritDoc} */
+        @Override public IgniteTxState txState() {
+            return null;
+        }
+
+        /** {@inheritDoc} */
         @Override public Collection<UUID> masterNodeIds() {
             return null;
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/dded8563/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitStateImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitStateImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitStateImpl.java
new file mode 100644
index 0000000..26d442b
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitStateImpl.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.Collection;
+import java.util.Collections;
+import org.apache.ignite.IgniteCheckedException;
+import 
org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteTxImplicitStateImpl extends IgniteTxStateAdapter {
+    /** */
+    private GridCacheContext cacheCtx;
+
+    /** {@inheritDoc} */
+    @Override public void addActiveCache(GridCacheContext ctx, 
IgniteTxLocalAdapter tx)
+        throws IgniteCheckedException {
+        assert cacheCtx == null : "Cache already set [cur=" + cacheCtx.name() 
+ ", new=" + ctx.name() + ']';
+
+        this.cacheCtx = ctx;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Integer firstCacheId() {
+        return cacheCtx != null ? cacheCtx.cacheId() : null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void awaitLastFut(GridCacheSharedContext ctx) {
+        if (cacheCtx == null)
+            return;
+
+        cacheCtx.cache().awaitLastFut();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean implicitSingle() {
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteCheckedException 
validateTopology(GridCacheSharedContext cctx, GridDhtTopologyFuture topFut) {
+        if (cacheCtx == null)
+            return null;
+
+        Throwable err = topFut.validateCache(cacheCtx);
+
+        if (err != null) {
+            return new IgniteCheckedException("Failed to perform cache 
operation (cache topology is not valid): " +
+                U.maskName(cacheCtx.name()));
+        }
+
+        if (CU.affinityNodes(cacheCtx, topFut.topologyVersion()).isEmpty()) {
+            return new ClusterTopologyServerNotFoundException("Failed to map 
keys for cache (all " +
+                "partition nodes left the grid): " + cacheCtx.name());
+        }
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean sync(GridCacheSharedContext cctx) {
+        return cacheCtx != null && 
cacheCtx.config().getWriteSynchronizationMode() == FULL_SYNC;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasNearCache(GridCacheSharedContext cctx) {
+        return cacheCtx != null && cacheCtx.isNear();
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridDhtTopologyFuture 
topologyReadLock(GridCacheSharedContext cctx, GridFutureAdapter<?> fut) {
+        if (cacheCtx == null || cacheCtx.isLocal())
+            return cctx.exchange().lastTopologyFuture();
+
+        cacheCtx.topology().readLock();
+
+        if (cacheCtx.topology().stopping()) {
+            fut.onDone(new IgniteCheckedException("Failed to perform cache 
operation (cache is stopped): " +
+                cacheCtx.name()));
+
+            return null;
+        }
+
+        return cacheCtx.topology().topologyVersionFuture();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void topologyReadUnlock(GridCacheSharedContext cctx) {
+        if (cacheCtx == null || cacheCtx.isLocal())
+            return;
+
+        cacheCtx.topology().readUnlock();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean storeUsed(GridCacheSharedContext cctx) {
+        if (cacheCtx == null)
+            return false;
+
+        CacheStoreManager store = cacheCtx.store();
+
+        return store.configured();
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<CacheStoreManager> 
stores(GridCacheSharedContext cctx) {
+        if (cacheCtx == null)
+            return null;
+
+        CacheStoreManager store = cacheCtx.store();
+
+        if (store.configured())
+            return Collections.singleton(store);
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onTxEnd(GridCacheSharedContext cctx, 
IgniteInternalTx tx, boolean commit) {
+        if (cacheCtx != null)
+            onTxEnd(cacheCtx, tx, commit);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/dded8563/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
index ada2538..57485fe 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java
@@ -284,6 +284,13 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter
         return depEnabled;
     }
 
+    /**
+     * @param depEnabled Flag indicating whether deployment is enabled for 
caches from this transaction or not.
+     */
+    public void activeCachesDeploymentEnabled(boolean depEnabled) {
+        this.depEnabled = depEnabled;
+    }
+
     /** {@inheritDoc} */
     @Override public boolean isStarted() {
         return txMap != null;
@@ -652,7 +659,7 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter
         if (!storeEnabled() || internal())
             return;
 
-        Collection<CacheStoreManager> stores = stores();
+        Collection<CacheStoreManager> stores = txState.stores(cctx);
 
         if (stores == null || stores.isEmpty())
             return;
@@ -1282,7 +1289,7 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter
                 cctx.tm().rollbackTx(this);
 
                 if (!internal()) {
-                    Collection<CacheStoreManager> stores = stores();
+                    Collection<CacheStoreManager> stores = 
txState.stores(cctx);
 
                     if (stores != null && !stores.isEmpty()) {
                         assert isWriteToStoreFromDhtValid(stores) :
@@ -3457,38 +3464,8 @@ public abstract class IgniteTxLocalAdapter extends 
IgniteTxAdapter
      * @throws IgniteCheckedException If caches already enlisted in this 
transaction are not compatible with given
      *      cache (e.g. they have different stores).
      */
-    protected void addActiveCache(GridCacheContext cacheCtx) throws 
IgniteCheckedException {
-        int cacheId = cacheCtx.cacheId();
-
-        // Check if we can enlist new cache to transaction.
-        if (!activeCacheIds.contains(cacheId)) {
-            String err = cctx.verifyTxCompatibility(this, activeCacheIds, 
cacheCtx);
-
-            if (err != null) {
-                StringBuilder cacheNames = new StringBuilder();
-
-                int idx = 0;
-
-                for (Integer activeCacheId : activeCacheIds) {
-                    cacheNames.append(cctx.cacheContext(activeCacheId).name());
-
-                    if (idx++ < activeCacheIds.size() - 1)
-                        cacheNames.append(", ");
-                }
-
-                throw new IgniteCheckedException("Failed to enlist new cache 
to existing transaction (" +
-                    err +
-                    ") [activeCaches=[" + cacheNames + "]" +
-                    ", cacheName=" + cacheCtx.name() +
-                    ", cacheSystem=" + cacheCtx.systemTx() +
-                    ", txSystem=" + system() + ']');
-            }
-            else
-                activeCacheIds.add(cacheId);
-
-            if (activeCacheIds.size() == 1)
-                depEnabled = cacheCtx.deploymentEnabled();
-        }
+    protected final void addActiveCache(GridCacheContext cacheCtx) throws 
IgniteCheckedException {
+        txState().addActiveCache(cacheCtx, this);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/dded8563/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index c2e7dea..ccccca0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -1092,13 +1092,7 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
                 if (!tx.system())
                     cctx.txMetrics().onTxCommit();
 
-                for (int cacheId : tx.activeCacheIds()) {
-                    GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
-
-                    if (cacheCtx.cache().configuration().isStatisticsEnabled())
-                        // Convert start time from ms to ns.
-                        
cacheCtx.cache().metrics0().onTxCommit((U.currentTimeMillis() - tx.startTime()) 
* 1000);
-                }
+                tx.txState().onTxEnd(cctx, tx, true);
             }
 
             if (slowTxWarnTimeout > 0 && tx.local() &&
@@ -1163,13 +1157,7 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
                 if (!tx.system())
                     cctx.txMetrics().onTxRollback();
 
-                for (int cacheId : tx.activeCacheIds()) {
-                    GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
-
-                    if (cacheCtx.cache().configuration().isStatisticsEnabled())
-                        // Convert start time from ms to ns.
-                        
cacheCtx.cache().metrics0().onTxRollback((U.currentTimeMillis() - 
tx.startTime()) * 1000);
-                }
+                tx.txState().onTxEnd(cctx, tx, false);
             }
 
             if (log.isDebugEnabled())
@@ -1233,7 +1221,7 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
             if (!tx.system())
                 threadMap.remove(tx.threadId(), tx);
             else {
-                Integer cacheId = F.first(tx.activeCacheIds());
+                Integer cacheId = tx.txState().firstCacheId();
 
                 if (cacheId != null)
                     sysThreadMap.remove(new TxThreadKey(tx.threadId(), 
cacheId), tx);

http://git-wip-us.apache.org/repos/asf/ignite/blob/dded8563/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
new file mode 100644
index 0000000..856ae04
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.Collection;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public interface IgniteTxState {
+    public boolean implicitSingle();
+
+    @Nullable public Integer firstCacheId();
+
+    public void awaitLastFut(GridCacheSharedContext cctx);
+
+    public IgniteCheckedException validateTopology(GridCacheSharedContext 
cctx, GridDhtTopologyFuture topFut);
+
+    public boolean sync(GridCacheSharedContext cctx);
+
+    public boolean hasNearCache(GridCacheSharedContext cctx);
+
+    public void addActiveCache(GridCacheContext cacheCtx, IgniteTxLocalAdapter 
tx) throws IgniteCheckedException;
+
+    public GridDhtTopologyFuture topologyReadLock(GridCacheSharedContext cctx, 
GridFutureAdapter<?> fut);
+
+    public void topologyReadUnlock(GridCacheSharedContext cctx);
+
+    public boolean storeUsed(GridCacheSharedContext cctx);
+
+    public Collection<CacheStoreManager> stores(GridCacheSharedContext cctx);
+
+    public void onTxEnd(GridCacheSharedContext cctx, IgniteInternalTx tx, 
boolean commit);
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/dded8563/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateAdapter.java
new file mode 100644
index 0000000..739bae4
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateAdapter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ *
+ */
+public abstract class IgniteTxStateAdapter implements IgniteTxState {
+    /**
+     * @param cacheCtx Cache context.
+     * @param tx Transaction.
+     * @param commit {@code False} if transaction rolled back.
+     */
+    protected final void onTxEnd(GridCacheContext cacheCtx, IgniteInternalTx 
tx, boolean commit) {
+        if (cacheCtx.cache().configuration().isStatisticsEnabled()) {
+            // Convert start time from ms to ns.
+            if (commit)
+                cacheCtx.cache().metrics0().onTxCommit((U.currentTimeMillis() 
- tx.startTime()) * 1000);
+            else
+                
cacheCtx.cache().metrics0().onTxRollback((U.currentTimeMillis() - 
tx.startTime()) * 1000);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/dded8563/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
new file mode 100644
index 0000000..c731560
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.transactions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.IgniteCheckedException;
+import 
org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
+import org.apache.ignite.internal.processors.cache.store.CacheStoreManager;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+public class IgniteTxStateImpl extends IgniteTxStateAdapter {
+    /** Active cache IDs. */
+    private Set<Integer> activeCacheIds = new HashSet<>();
+
+    /** {@inheritDoc} */
+    @Override public boolean implicitSingle() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Integer firstCacheId() {
+        return F.first(activeCacheIds);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void awaitLastFut(GridCacheSharedContext cctx) {
+        if (activeCacheIds.size() > 1) {
+            for (Integer cacheId : activeCacheIds)
+                cctx.cacheContext(cacheId).cache().awaitLastFut();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteCheckedException 
validateTopology(GridCacheSharedContext cctx,
+        GridDhtTopologyFuture topFut) {
+        StringBuilder invalidCaches = null;
+
+        for (Integer cacheId : activeCacheIds) {
+            GridCacheContext ctx = cctx.cacheContext(cacheId);
+
+            assert ctx != null : cacheId;
+
+            Throwable err = topFut.validateCache(ctx);
+
+            if (err != null) {
+                if (invalidCaches != null)
+                    invalidCaches.append(", ");
+                else
+                    invalidCaches = new StringBuilder();
+
+                invalidCaches.append(U.maskName(ctx.name()));
+            }
+        }
+
+        if (invalidCaches != null) {
+            return new IgniteCheckedException("Failed to perform cache 
operation (cache topology is not valid): " +
+                invalidCaches.toString());
+        }
+
+        for (int cacheId : activeCacheIds) {
+            GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
+
+            if (CU.affinityNodes(cacheCtx, 
topFut.topologyVersion()).isEmpty()) {
+                return new ClusterTopologyServerNotFoundException("Failed to 
map keys for cache (all " +
+                    "partition nodes left the grid): " + cacheCtx.name());
+            }
+        }
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean sync(GridCacheSharedContext cctx) {
+        for (int cacheId : activeCacheIds) {
+            if 
(cctx.cacheContext(cacheId).config().getWriteSynchronizationMode() == FULL_SYNC)
+                return true;
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean hasNearCache(GridCacheSharedContext cctx) {
+        for (Integer cacheId : activeCacheIds) {
+            GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+
+            if (cacheCtx.isNear())
+                return true;
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void addActiveCache(GridCacheContext cacheCtx, 
IgniteTxLocalAdapter tx)
+        throws IgniteCheckedException {
+        GridCacheSharedContext cctx = cacheCtx.shared();
+
+        int cacheId = cacheCtx.cacheId();
+
+        // Check if we can enlist new cache to transaction.
+        if (!activeCacheIds.contains(cacheId)) {
+            String err = cctx.verifyTxCompatibility(tx, activeCacheIds, 
cacheCtx);
+
+            if (err != null) {
+                StringBuilder cacheNames = new StringBuilder();
+
+                int idx = 0;
+
+                for (Integer activeCacheId : activeCacheIds) {
+                    cacheNames.append(cctx.cacheContext(activeCacheId).name());
+
+                    if (idx++ < activeCacheIds.size() - 1)
+                        cacheNames.append(", ");
+                }
+
+                throw new IgniteCheckedException("Failed to enlist new cache 
to existing transaction (" +
+                    err +
+                    ") [activeCaches=[" + cacheNames + "]" +
+                    ", cacheName=" + cacheCtx.name() +
+                    ", cacheSystem=" + cacheCtx.systemTx() +
+                    ", txSystem=" + tx.system() + ']');
+            }
+            else
+                activeCacheIds.add(cacheId);
+
+            if (activeCacheIds.size() == 1)
+                tx.activeCachesDeploymentEnabled(cacheCtx.deploymentEnabled());
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridDhtTopologyFuture 
topologyReadLock(GridCacheSharedContext cctx, GridFutureAdapter<?> fut) {
+        if (activeCacheIds.isEmpty())
+            return cctx.exchange().lastTopologyFuture();
+
+        GridCacheContext<?, ?> nonLocCtx = null;
+
+        for (int cacheId : activeCacheIds) {
+            GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
+
+            if (!cacheCtx.isLocal()) {
+                nonLocCtx = cacheCtx;
+
+                break;
+            }
+        }
+
+        if (nonLocCtx == null)
+            return cctx.exchange().lastTopologyFuture();
+
+        nonLocCtx.topology().readLock();
+
+        if (nonLocCtx.topology().stopping()) {
+            fut.onDone(new IgniteCheckedException("Failed to perform cache 
operation (cache is stopped): " +
+                nonLocCtx.name()));
+
+            return null;
+        }
+
+        return nonLocCtx.topology().topologyVersionFuture();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void topologyReadUnlock(GridCacheSharedContext cctx) {
+        if (!activeCacheIds.isEmpty()) {
+            GridCacheContext<?, ?> nonLocCtx = null;
+
+            for (int cacheId : activeCacheIds) {
+                GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
+
+                if (!cacheCtx.isLocal()) {
+                    nonLocCtx = cacheCtx;
+
+                    break;
+                }
+            }
+
+            if (nonLocCtx != null)
+                nonLocCtx.topology().readUnlock();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean storeUsed(GridCacheSharedContext cctx) {
+        if (!activeCacheIds.isEmpty()) {
+            for (int cacheId : activeCacheIds) {
+                CacheStoreManager store = cctx.cacheContext(cacheId).store();
+
+                if (store.configured())
+                    return true;
+            }
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<CacheStoreManager> 
stores(GridCacheSharedContext cctx) {
+        Collection<Integer> cacheIds = activeCacheIds;
+
+        if (!cacheIds.isEmpty()) {
+            Collection<CacheStoreManager> stores = new 
ArrayList<>(cacheIds.size());
+
+            for (int cacheId : cacheIds) {
+                CacheStoreManager store = cctx.cacheContext(cacheId).store();
+
+                if (store.configured())
+                    stores.add(store);
+            }
+
+            return stores;
+        }
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onTxEnd(GridCacheSharedContext cctx, 
IgniteInternalTx tx, boolean commit) {
+        for (int cacheId : activeCacheIds) {
+            GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
+
+            onTxEnd(cacheCtx, tx, commit);
+        }
+    }
+}

Reply via email to