Repository: ignite Updated Branches: refs/heads/ignite-single-op-tx f09d09fe5 -> dded8563d
'Single' operations optimizations for tx cache. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dded8563 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dded8563 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dded8563 Branch: refs/heads/ignite-single-op-tx Commit: dded8563db0030d016669767f9afbe9199b158dd Parents: f09d09f Author: sboikov <sboi...@gridgain.com> Authored: Fri Nov 13 16:31:10 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Fri Nov 13 16:31:10 2015 +0300 ---------------------------------------------------------------------- .../cache/GridCacheSharedContext.java | 30 +-- ...arOptimisticSerializableTxPrepareFuture.java | 13 - .../near/GridNearOptimisticTxPrepareFuture.java | 13 - ...ridNearOptimisticTxPrepareFutureAdapter.java | 70 +---- .../near/GridNearTxFinishFuture.java | 12 +- .../cache/distributed/near/GridNearTxLocal.java | 10 +- .../cache/transactions/IgniteInternalTx.java | 5 + .../cache/transactions/IgniteTxAdapter.java | 64 ++--- .../transactions/IgniteTxImplicitStateImpl.java | 151 +++++++++++ .../transactions/IgniteTxLocalAdapter.java | 45 +--- .../cache/transactions/IgniteTxManager.java | 18 +- .../cache/transactions/IgniteTxState.java | 56 ++++ .../transactions/IgniteTxStateAdapter.java | 41 +++ .../cache/transactions/IgniteTxStateImpl.java | 256 +++++++++++++++++++ 14 files changed, 553 insertions(+), 231 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/dded8563/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index b37742c..5321bb3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -582,12 +582,7 @@ public class GridCacheSharedContext<K, V> { * @throws IgniteCheckedException If failed. */ public void endTx(IgniteInternalTx tx) throws IgniteCheckedException { - Collection<Integer> cacheIds = tx.activeCacheIds(); - - if (!cacheIds.isEmpty()) { - for (Integer cacheId : cacheIds) - cacheContext(cacheId).cache().awaitLastFut(); - } + tx.txState().awaitLastFut(this); tx.close(); } @@ -597,21 +592,9 @@ public class GridCacheSharedContext<K, V> { * @return Commit future. */ public IgniteInternalFuture<IgniteInternalTx> commitTxAsync(IgniteInternalTx tx) { - Collection<Integer> cacheIds = tx.activeCacheIds(); - - if (cacheIds.isEmpty()) - return tx.commitAsync(); - else if (cacheIds.size() == 1) { - int cacheId = F.first(cacheIds); - - return cacheContext(cacheId).cache().commitTxAsync(tx); - } - else { - for (Integer cacheId : cacheIds) - cacheContext(cacheId).cache().awaitLastFut(); + tx.txState().awaitLastFut(this); - return tx.commitAsync(); - } + return tx.commitAsync(); } /** @@ -620,12 +603,7 @@ public class GridCacheSharedContext<K, V> { * @return Rollback future. */ public IgniteInternalFuture rollbackTxAsync(IgniteInternalTx tx) throws IgniteCheckedException { - Collection<Integer> cacheIds = tx.activeCacheIds(); - - if (!cacheIds.isEmpty()) { - for (Integer cacheId : cacheIds) - cacheContext(cacheId).cache().awaitLastFut(); - } + tx.txState().awaitLastFut(this); return tx.rollbackAsync(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/dded8563/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java index 5488bb1..0ff8eae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@ -326,19 +326,6 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim txMapping = new GridDhtTxMapping(); - if (!F.isEmpty(reads) || !F.isEmpty(writes)) { - for (int cacheId : tx.activeCacheIds()) { - GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId); - - if (CU.affinityNodes(cacheCtx, topVer).isEmpty()) { - onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all " + - "partition nodes left the grid): " + cacheCtx.name())); - - return; - } - } - } - Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> mappings = new HashMap<>(); for (IgniteTxEntry write : writes) http://git-wip-us.apache.org/repos/asf/ignite/blob/dded8563/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index 0002180..2a6d940 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -296,19 +296,6 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa Queue<GridDistributedTxMapping> mappings = new ArrayDeque<>(); - if (!F.isEmpty(writes)) { - for (int cacheId : tx.activeCacheIds()) { - GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId); - - if (CU.affinityNodes(cacheCtx, topVer).isEmpty()) { - onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all " + - "partition nodes left the grid): " + cacheCtx.name())); - - return; - } - } - } - // Assign keys to primary nodes. GridDistributedTxMapping cur = null; http://git-wip-us.apache.org/repos/asf/ignite/blob/dded8563/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java index fd9183e..92fbfec 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java @@ -75,56 +75,14 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT * @return Topology ready future. */ protected final GridDhtTopologyFuture topologyReadLock() { - if (tx.activeCacheIds().isEmpty()) - return cctx.exchange().lastTopologyFuture(); - - GridCacheContext<?, ?> nonLocCtx = null; - - for (int cacheId : tx.activeCacheIds()) { - GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId); - - if (!cacheCtx.isLocal()) { - nonLocCtx = cacheCtx; - - break; - } - } - - if (nonLocCtx == null) - return cctx.exchange().lastTopologyFuture(); - - nonLocCtx.topology().readLock(); - - if (nonLocCtx.topology().stopping()) { - onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + - nonLocCtx.name())); - - return null; - } - - return nonLocCtx.topology().topologyVersionFuture(); + return tx.txState().topologyReadLock(cctx, this); } /** * Releases topology read lock. */ protected final void topologyReadUnlock() { - if (!tx.activeCacheIds().isEmpty()) { - GridCacheContext<?, ?> nonLocCtx = null; - - for (int cacheId : tx.activeCacheIds()) { - GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId); - - if (!cacheCtx.isLocal()) { - nonLocCtx = cacheCtx; - - break; - } - } - - if (nonLocCtx != null) - nonLocCtx.topology().readUnlock(); - } + tx.txState().topologyReadUnlock(cctx); } /** @@ -160,28 +118,10 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT } if (topVer != null) { - StringBuilder invalidCaches = null; - - for (Integer cacheId : tx.activeCacheIds()) { - GridCacheContext ctx = cctx.cacheContext(cacheId); - - assert ctx != null : cacheId; - - Throwable err = topFut.validateCache(ctx); - - if (err != null) { - if (invalidCaches != null) - invalidCaches.append(", "); - else - invalidCaches = new StringBuilder(); - - invalidCaches.append(U.maskName(ctx.name())); - } - } + IgniteCheckedException err = tx.txState().validateTopology(cctx, topFut); - if (invalidCaches != null) { - onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " + - invalidCaches.toString())); + if (err != null) { + onDone(err); return; } http://git-wip-us.apache.org/repos/asf/ignite/blob/dded8563/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index a9dbda2..7b1ff05 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -487,17 +487,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu assert tx.mappings().size() == 1; - boolean finish = false; - - for (Integer cacheId : tx.activeCacheIds()) { - GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); - - if (cacheCtx.isNear()) { - finish = true; - - break; - } - } + boolean finish = tx.txState().hasNearCache(cctx); if (finish) { GridDistributedTxMapping mapping = F.first(tx.mappings().values()); http://git-wip-us.apache.org/repos/asf/ignite/blob/dded8563/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index db4a4b8..472a081 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -280,15 +280,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { * @return {@code True} if transaction is fully synchronous. */ private boolean sync() { - if (super.syncCommit()) - return true; - - for (int cacheId : activeCacheIds()) { - if (cctx.cacheContext(cacheId).config().getWriteSynchronizationMode() == FULL_SYNC) - return true; - } - - return false; + return super.syncCommit() || txState().sync(cctx); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/dded8563/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java index 94af6bb..7819ed3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java @@ -280,6 +280,11 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject { public Collection<Integer> activeCacheIds(); /** + * @return Transaction state. + */ + public IgniteTxState txState(); + + /** * @return {@code true} or {@code false} if the deployment is enabled or disabled for all active caches involved * in this transaction. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/dded8563/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index eb2ca2c..ab400eb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -120,10 +120,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter @GridToStringInclude protected boolean implicit; - /** Implicit with one key flag. */ - @GridToStringInclude - protected boolean implicitSingle; - /** Local flag. */ @GridToStringInclude protected boolean loc; @@ -251,6 +247,9 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter @GridToStringExclude private TransactionProxyImpl proxy; + /** */ + protected IgniteTxState txState; + /** * Empty constructor required for {@link Externalizable}. */ @@ -295,7 +294,6 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter this.cctx = cctx; this.xidVer = xidVer; this.implicit = implicit; - this.implicitSingle = implicitSingle; this.loc = loc; this.sys = sys; this.plc = plc; @@ -315,6 +313,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter threadId = Thread.currentThread().getId(); + txState = implicitSingle ? new IgniteTxImplicitStateImpl() : new IgniteTxStateImpl(); + if (log == null) log = U.logger(cctx.kernalContext(), logRef, this); } @@ -362,14 +362,20 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter this.taskNameHash = taskNameHash; implicit = false; - implicitSingle = false; loc = false; + txState = new IgniteTxStateImpl(); + if (log == null) log = U.logger(cctx.kernalContext(), logRef, this); } /** {@inheritDoc} */ + @Override public IgniteTxState txState() { + return txState; + } + + /** {@inheritDoc} */ @Override public boolean localResult() { assert originatingNodeId() != null; @@ -421,45 +427,8 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter /** {@inheritDoc} */ @Override public boolean storeUsed() { - if (!storeEnabled()) - return false; - - Collection<Integer> cacheIds = activeCacheIds(); - - if (!cacheIds.isEmpty()) { - for (int cacheId : cacheIds) { - CacheStoreManager store = cctx.cacheContext(cacheId).store(); - - if (store.configured()) - return true; - } - } + return storeEnabled() && txState.storeUsed(cctx); - return false; - } - - /** - * Store manager for current transaction. - * - * @return Store manager. - */ - protected Collection<CacheStoreManager> stores() { - Collection<Integer> cacheIds = activeCacheIds(); - - if (!cacheIds.isEmpty()) { - Collection<CacheStoreManager> stores = new ArrayList<>(cacheIds.size()); - - for (int cacheId : cacheIds) { - CacheStoreManager store = cctx.cacheContext(cacheId).store(); - - if (store.configured()) - stores.add(store); - } - - return stores; - } - - return null; } /** @@ -645,7 +614,7 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter /** {@inheritDoc} */ @Override public boolean implicitSingle() { - return implicitSingle; + return txState.implicitSingle(); } /** {@inheritDoc} */ @@ -1877,6 +1846,11 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter } /** {@inheritDoc} */ + @Override public IgniteTxState txState() { + return null; + } + + /** {@inheritDoc} */ @Override public Collection<UUID> masterNodeIds() { return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/dded8563/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitStateImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitStateImpl.java new file mode 100644 index 0000000..26d442b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxImplicitStateImpl.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.transactions; + +import java.util.Collection; +import java.util.Collections; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; +import org.apache.ignite.internal.processors.cache.store.CacheStoreManager; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class IgniteTxImplicitStateImpl extends IgniteTxStateAdapter { + /** */ + private GridCacheContext cacheCtx; + + /** {@inheritDoc} */ + @Override public void addActiveCache(GridCacheContext ctx, IgniteTxLocalAdapter tx) + throws IgniteCheckedException { + assert cacheCtx == null : "Cache already set [cur=" + cacheCtx.name() + ", new=" + ctx.name() + ']'; + + this.cacheCtx = ctx; + } + + /** {@inheritDoc} */ + @Nullable @Override public Integer firstCacheId() { + return cacheCtx != null ? cacheCtx.cacheId() : null; + } + + /** {@inheritDoc} */ + @Override public void awaitLastFut(GridCacheSharedContext ctx) { + if (cacheCtx == null) + return; + + cacheCtx.cache().awaitLastFut(); + } + + /** {@inheritDoc} */ + @Override public boolean implicitSingle() { + return true; + } + + /** {@inheritDoc} */ + @Override public IgniteCheckedException validateTopology(GridCacheSharedContext cctx, GridDhtTopologyFuture topFut) { + if (cacheCtx == null) + return null; + + Throwable err = topFut.validateCache(cacheCtx); + + if (err != null) { + return new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " + + U.maskName(cacheCtx.name())); + } + + if (CU.affinityNodes(cacheCtx, topFut.topologyVersion()).isEmpty()) { + return new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all " + + "partition nodes left the grid): " + cacheCtx.name()); + } + + return null; + } + + /** {@inheritDoc} */ + @Override public boolean sync(GridCacheSharedContext cctx) { + return cacheCtx != null && cacheCtx.config().getWriteSynchronizationMode() == FULL_SYNC; + } + + /** {@inheritDoc} */ + @Override public boolean hasNearCache(GridCacheSharedContext cctx) { + return cacheCtx != null && cacheCtx.isNear(); + } + + /** {@inheritDoc} */ + @Override public GridDhtTopologyFuture topologyReadLock(GridCacheSharedContext cctx, GridFutureAdapter<?> fut) { + if (cacheCtx == null || cacheCtx.isLocal()) + return cctx.exchange().lastTopologyFuture(); + + cacheCtx.topology().readLock(); + + if (cacheCtx.topology().stopping()) { + fut.onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + + cacheCtx.name())); + + return null; + } + + return cacheCtx.topology().topologyVersionFuture(); + } + + /** {@inheritDoc} */ + @Override public void topologyReadUnlock(GridCacheSharedContext cctx) { + if (cacheCtx == null || cacheCtx.isLocal()) + return; + + cacheCtx.topology().readUnlock(); + } + + /** {@inheritDoc} */ + @Override public boolean storeUsed(GridCacheSharedContext cctx) { + if (cacheCtx == null) + return false; + + CacheStoreManager store = cacheCtx.store(); + + return store.configured(); + } + + /** {@inheritDoc} */ + @Override public Collection<CacheStoreManager> stores(GridCacheSharedContext cctx) { + if (cacheCtx == null) + return null; + + CacheStoreManager store = cacheCtx.store(); + + if (store.configured()) + return Collections.singleton(store); + + return null; + } + + /** {@inheritDoc} */ + @Override public void onTxEnd(GridCacheSharedContext cctx, IgniteInternalTx tx, boolean commit) { + if (cacheCtx != null) + onTxEnd(cacheCtx, tx, commit); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/dded8563/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index ada2538..57485fe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -284,6 +284,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter return depEnabled; } + /** + * @param depEnabled Flag indicating whether deployment is enabled for caches from this transaction or not. + */ + public void activeCachesDeploymentEnabled(boolean depEnabled) { + this.depEnabled = depEnabled; + } + /** {@inheritDoc} */ @Override public boolean isStarted() { return txMap != null; @@ -652,7 +659,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (!storeEnabled() || internal()) return; - Collection<CacheStoreManager> stores = stores(); + Collection<CacheStoreManager> stores = txState.stores(cctx); if (stores == null || stores.isEmpty()) return; @@ -1282,7 +1289,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter cctx.tm().rollbackTx(this); if (!internal()) { - Collection<CacheStoreManager> stores = stores(); + Collection<CacheStoreManager> stores = txState.stores(cctx); if (stores != null && !stores.isEmpty()) { assert isWriteToStoreFromDhtValid(stores) : @@ -3457,38 +3464,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter * @throws IgniteCheckedException If caches already enlisted in this transaction are not compatible with given * cache (e.g. they have different stores). */ - protected void addActiveCache(GridCacheContext cacheCtx) throws IgniteCheckedException { - int cacheId = cacheCtx.cacheId(); - - // Check if we can enlist new cache to transaction. - if (!activeCacheIds.contains(cacheId)) { - String err = cctx.verifyTxCompatibility(this, activeCacheIds, cacheCtx); - - if (err != null) { - StringBuilder cacheNames = new StringBuilder(); - - int idx = 0; - - for (Integer activeCacheId : activeCacheIds) { - cacheNames.append(cctx.cacheContext(activeCacheId).name()); - - if (idx++ < activeCacheIds.size() - 1) - cacheNames.append(", "); - } - - throw new IgniteCheckedException("Failed to enlist new cache to existing transaction (" + - err + - ") [activeCaches=[" + cacheNames + "]" + - ", cacheName=" + cacheCtx.name() + - ", cacheSystem=" + cacheCtx.systemTx() + - ", txSystem=" + system() + ']'); - } - else - activeCacheIds.add(cacheId); - - if (activeCacheIds.size() == 1) - depEnabled = cacheCtx.deploymentEnabled(); - } + protected final void addActiveCache(GridCacheContext cacheCtx) throws IgniteCheckedException { + txState().addActiveCache(cacheCtx, this); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/dded8563/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index c2e7dea..ccccca0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -1092,13 +1092,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { if (!tx.system()) cctx.txMetrics().onTxCommit(); - for (int cacheId : tx.activeCacheIds()) { - GridCacheContext cacheCtx = cctx.cacheContext(cacheId); - - if (cacheCtx.cache().configuration().isStatisticsEnabled()) - // Convert start time from ms to ns. - cacheCtx.cache().metrics0().onTxCommit((U.currentTimeMillis() - tx.startTime()) * 1000); - } + tx.txState().onTxEnd(cctx, tx, true); } if (slowTxWarnTimeout > 0 && tx.local() && @@ -1163,13 +1157,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { if (!tx.system()) cctx.txMetrics().onTxRollback(); - for (int cacheId : tx.activeCacheIds()) { - GridCacheContext cacheCtx = cctx.cacheContext(cacheId); - - if (cacheCtx.cache().configuration().isStatisticsEnabled()) - // Convert start time from ms to ns. - cacheCtx.cache().metrics0().onTxRollback((U.currentTimeMillis() - tx.startTime()) * 1000); - } + tx.txState().onTxEnd(cctx, tx, false); } if (log.isDebugEnabled()) @@ -1233,7 +1221,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { if (!tx.system()) threadMap.remove(tx.threadId(), tx); else { - Integer cacheId = F.first(tx.activeCacheIds()); + Integer cacheId = tx.txState().firstCacheId(); if (cacheId != null) sysThreadMap.remove(new TxThreadKey(tx.threadId(), cacheId), tx); http://git-wip-us.apache.org/repos/asf/ignite/blob/dded8563/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java new file mode 100644 index 0000000..856ae04 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxState.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.transactions; + +import java.util.Collection; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; +import org.apache.ignite.internal.processors.cache.store.CacheStoreManager; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public interface IgniteTxState { + public boolean implicitSingle(); + + @Nullable public Integer firstCacheId(); + + public void awaitLastFut(GridCacheSharedContext cctx); + + public IgniteCheckedException validateTopology(GridCacheSharedContext cctx, GridDhtTopologyFuture topFut); + + public boolean sync(GridCacheSharedContext cctx); + + public boolean hasNearCache(GridCacheSharedContext cctx); + + public void addActiveCache(GridCacheContext cacheCtx, IgniteTxLocalAdapter tx) throws IgniteCheckedException; + + public GridDhtTopologyFuture topologyReadLock(GridCacheSharedContext cctx, GridFutureAdapter<?> fut); + + public void topologyReadUnlock(GridCacheSharedContext cctx); + + public boolean storeUsed(GridCacheSharedContext cctx); + + public Collection<CacheStoreManager> stores(GridCacheSharedContext cctx); + + public void onTxEnd(GridCacheSharedContext cctx, IgniteInternalTx tx, boolean commit); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/dded8563/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateAdapter.java new file mode 100644 index 0000000..739bae4 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateAdapter.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.transactions; + +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * + */ +public abstract class IgniteTxStateAdapter implements IgniteTxState { + /** + * @param cacheCtx Cache context. + * @param tx Transaction. + * @param commit {@code False} if transaction rolled back. + */ + protected final void onTxEnd(GridCacheContext cacheCtx, IgniteInternalTx tx, boolean commit) { + if (cacheCtx.cache().configuration().isStatisticsEnabled()) { + // Convert start time from ms to ns. + if (commit) + cacheCtx.cache().metrics0().onTxCommit((U.currentTimeMillis() - tx.startTime()) * 1000); + else + cacheCtx.cache().metrics0().onTxRollback((U.currentTimeMillis() - tx.startTime()) * 1000); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/dded8563/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java new file mode 100644 index 0000000..c731560 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.transactions; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; +import org.apache.ignite.internal.processors.cache.store.CacheStoreManager; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class IgniteTxStateImpl extends IgniteTxStateAdapter { + /** Active cache IDs. */ + private Set<Integer> activeCacheIds = new HashSet<>(); + + /** {@inheritDoc} */ + @Override public boolean implicitSingle() { + return false; + } + + /** {@inheritDoc} */ + @Nullable @Override public Integer firstCacheId() { + return F.first(activeCacheIds); + } + + /** {@inheritDoc} */ + @Override public void awaitLastFut(GridCacheSharedContext cctx) { + if (activeCacheIds.size() > 1) { + for (Integer cacheId : activeCacheIds) + cctx.cacheContext(cacheId).cache().awaitLastFut(); + } + } + + /** {@inheritDoc} */ + @Override public IgniteCheckedException validateTopology(GridCacheSharedContext cctx, + GridDhtTopologyFuture topFut) { + StringBuilder invalidCaches = null; + + for (Integer cacheId : activeCacheIds) { + GridCacheContext ctx = cctx.cacheContext(cacheId); + + assert ctx != null : cacheId; + + Throwable err = topFut.validateCache(ctx); + + if (err != null) { + if (invalidCaches != null) + invalidCaches.append(", "); + else + invalidCaches = new StringBuilder(); + + invalidCaches.append(U.maskName(ctx.name())); + } + } + + if (invalidCaches != null) { + return new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " + + invalidCaches.toString()); + } + + for (int cacheId : activeCacheIds) { + GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId); + + if (CU.affinityNodes(cacheCtx, topFut.topologyVersion()).isEmpty()) { + return new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all " + + "partition nodes left the grid): " + cacheCtx.name()); + } + } + + return null; + } + + /** {@inheritDoc} */ + @Override public boolean sync(GridCacheSharedContext cctx) { + for (int cacheId : activeCacheIds) { + if (cctx.cacheContext(cacheId).config().getWriteSynchronizationMode() == FULL_SYNC) + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean hasNearCache(GridCacheSharedContext cctx) { + for (Integer cacheId : activeCacheIds) { + GridCacheContext cacheCtx = cctx.cacheContext(cacheId); + + if (cacheCtx.isNear()) + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public void addActiveCache(GridCacheContext cacheCtx, IgniteTxLocalAdapter tx) + throws IgniteCheckedException { + GridCacheSharedContext cctx = cacheCtx.shared(); + + int cacheId = cacheCtx.cacheId(); + + // Check if we can enlist new cache to transaction. + if (!activeCacheIds.contains(cacheId)) { + String err = cctx.verifyTxCompatibility(tx, activeCacheIds, cacheCtx); + + if (err != null) { + StringBuilder cacheNames = new StringBuilder(); + + int idx = 0; + + for (Integer activeCacheId : activeCacheIds) { + cacheNames.append(cctx.cacheContext(activeCacheId).name()); + + if (idx++ < activeCacheIds.size() - 1) + cacheNames.append(", "); + } + + throw new IgniteCheckedException("Failed to enlist new cache to existing transaction (" + + err + + ") [activeCaches=[" + cacheNames + "]" + + ", cacheName=" + cacheCtx.name() + + ", cacheSystem=" + cacheCtx.systemTx() + + ", txSystem=" + tx.system() + ']'); + } + else + activeCacheIds.add(cacheId); + + if (activeCacheIds.size() == 1) + tx.activeCachesDeploymentEnabled(cacheCtx.deploymentEnabled()); + } + } + + /** {@inheritDoc} */ + @Override public GridDhtTopologyFuture topologyReadLock(GridCacheSharedContext cctx, GridFutureAdapter<?> fut) { + if (activeCacheIds.isEmpty()) + return cctx.exchange().lastTopologyFuture(); + + GridCacheContext<?, ?> nonLocCtx = null; + + for (int cacheId : activeCacheIds) { + GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId); + + if (!cacheCtx.isLocal()) { + nonLocCtx = cacheCtx; + + break; + } + } + + if (nonLocCtx == null) + return cctx.exchange().lastTopologyFuture(); + + nonLocCtx.topology().readLock(); + + if (nonLocCtx.topology().stopping()) { + fut.onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + + nonLocCtx.name())); + + return null; + } + + return nonLocCtx.topology().topologyVersionFuture(); + } + + /** {@inheritDoc} */ + @Override public void topologyReadUnlock(GridCacheSharedContext cctx) { + if (!activeCacheIds.isEmpty()) { + GridCacheContext<?, ?> nonLocCtx = null; + + for (int cacheId : activeCacheIds) { + GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId); + + if (!cacheCtx.isLocal()) { + nonLocCtx = cacheCtx; + + break; + } + } + + if (nonLocCtx != null) + nonLocCtx.topology().readUnlock(); + } + } + + /** {@inheritDoc} */ + @Override public boolean storeUsed(GridCacheSharedContext cctx) { + if (!activeCacheIds.isEmpty()) { + for (int cacheId : activeCacheIds) { + CacheStoreManager store = cctx.cacheContext(cacheId).store(); + + if (store.configured()) + return true; + } + } + + return false; + } + + /** {@inheritDoc} */ + @Override public Collection<CacheStoreManager> stores(GridCacheSharedContext cctx) { + Collection<Integer> cacheIds = activeCacheIds; + + if (!cacheIds.isEmpty()) { + Collection<CacheStoreManager> stores = new ArrayList<>(cacheIds.size()); + + for (int cacheId : cacheIds) { + CacheStoreManager store = cctx.cacheContext(cacheId).store(); + + if (store.configured()) + stores.add(store); + } + + return stores; + } + + return null; + } + + /** {@inheritDoc} */ + @Override public void onTxEnd(GridCacheSharedContext cctx, IgniteInternalTx tx, boolean commit) { + for (int cacheId : activeCacheIds) { + GridCacheContext cacheCtx = cctx.cacheContext(cacheId); + + onTxEnd(cacheCtx, tx, commit); + } + } +}