'Single' operations optimizations for tx cache.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3ff71fd7 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3ff71fd7 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3ff71fd7 Branch: refs/heads/ignite-direct-marsh-opt Commit: 3ff71fd742efda91fef6ed6d92dac11cc6d00976 Parents: 38e66d9 Author: sboikov <sboi...@gridgain.com> Authored: Tue Nov 17 19:38:05 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue Nov 17 19:38:05 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 48 +- .../processors/cache/GridCacheMapEntry.java | 11 +- .../cache/GridCacheSharedContext.java | 31 +- .../distributed/GridDistributedTxMapping.java | 78 - .../GridDistributedTxRemoteAdapter.java | 136 +- .../dht/GridDhtTransactionalCacheAdapter.java | 1 - .../cache/distributed/dht/GridDhtTxLocal.java | 24 +- .../distributed/dht/GridDhtTxLocalAdapter.java | 80 +- .../cache/distributed/dht/GridDhtTxMapping.java | 134 +- .../distributed/dht/GridDhtTxPrepareFuture.java | 59 +- .../cache/distributed/dht/GridDhtTxRemote.java | 29 +- ...arOptimisticSerializableTxPrepareFuture.java | 16 - .../near/GridNearOptimisticTxPrepareFuture.java | 103 +- ...ridNearOptimisticTxPrepareFutureAdapter.java | 72 +- .../GridNearPessimisticTxPrepareFuture.java | 1 - .../near/GridNearTransactionalCache.java | 1 - .../near/GridNearTxFinishFuture.java | 61 +- .../cache/distributed/near/GridNearTxLocal.java | 109 +- .../near/GridNearTxPrepareRequest.java | 9 - .../distributed/near/GridNearTxRemote.java | 26 +- .../distributed/near/IgniteTxMappings.java | 75 + .../distributed/near/IgniteTxMappingsImpl.java | 92 ++ .../near/IgniteTxMappingsSingleImpl.java | 101 ++ .../cache/transactions/IgniteInternalTx.java | 12 +- .../cache/transactions/IgniteTxAdapter.java | 67 +- .../cache/transactions/IgniteTxHandler.java | 16 +- .../IgniteTxImplicitSingleStateImpl.java | 259 ++++ .../transactions/IgniteTxLocalAdapter.java | 1388 ++++++++++-------- .../cache/transactions/IgniteTxLocalEx.java | 30 +- .../cache/transactions/IgniteTxLocalState.java | 44 + .../transactions/IgniteTxLocalStateAdapter.java | 41 + .../cache/transactions/IgniteTxManager.java | 18 +- .../cache/transactions/IgniteTxMap.java | 3 +- .../cache/transactions/IgniteTxRemoteEx.java | 17 +- .../IgniteTxRemoteSingleStateImpl.java | 108 ++ .../cache/transactions/IgniteTxRemoteState.java | 34 + .../IgniteTxRemoteStateAdapter.java | 110 ++ .../transactions/IgniteTxRemoteStateImpl.java | 124 ++ .../cache/transactions/IgniteTxState.java | 171 +++ .../cache/transactions/IgniteTxStateImpl.java | 375 +++++ 40 files changed, 2709 insertions(+), 1405 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 419ccec..cbb7486 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -1854,7 +1854,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V V prevVal = syncOp(new SyncOp<V>(true) { @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - return (V)tx.putAllAsync(ctx, F.t(key, val), true, filter).get().value(); + return (V)tx.putAsync(ctx, key, val, true, filter).get().value(); } @Override public String toString() { @@ -1909,7 +1909,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return asyncOp(new AsyncOp<V>() { @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) { - return tx.putAllAsync(ctx, F.t(key, val), true, filter) + return tx.putAsync(ctx, key, val, true, filter) .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>)RET2VAL); } @@ -1948,7 +1948,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V Boolean stored = syncOp(new SyncOp<Boolean>(true) { @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - return tx.putAllAsync(ctx, F.t(key, val), false, filter).get().success(); + return tx.putAsync(ctx, key, val, false, filter).get().success(); } @Override public String toString() { @@ -2013,10 +2013,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return syncOp(new SyncOp<EntryProcessorResult<T>>(true) { @Nullable @Override public EntryProcessorResult<T> op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = - Collections.singletonMap(key, (EntryProcessor<K, V, Object>) entryProcessor); - - IgniteInternalFuture<GridCacheReturn> fut = tx.invokeAsync(ctx, invokeMap, args); + IgniteInternalFuture<GridCacheReturn> fut = + tx.invokeAsync(ctx, key, (EntryProcessor<K, V, Object>)entryProcessor, args); Map<K, EntryProcessorResult<T>> resMap = fut.get().value(); @@ -2240,8 +2238,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return asyncOp(new AsyncOp<Boolean>() { @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) { - return tx.putAllAsync(ctx, F.t(key, val), false, filter).chain( - (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG); + return tx.putAsync(ctx, key, val, false, filter).chain( + (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG); } @Override public String toString() { @@ -2275,7 +2273,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return syncOp(new SyncOp<V>(true) { @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - return (V)tx.putAllAsync(ctx, F.t(key, val), true, ctx.noValArray()).get().value(); + return (V)tx.putAsync(ctx, key, val, true, ctx.noValArray()).get().value(); } @Override public String toString() { @@ -2299,8 +2297,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() { @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) { - return tx.putAllAsync(ctx, F.t(key, val), true, ctx.noValArray()) - .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>) RET2VAL); + return tx.putAsync(ctx, key, val, true, ctx.noValArray()) + .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>)RET2VAL); } @Override public String toString() { @@ -2329,7 +2327,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V Boolean stored = syncOp(new SyncOp<Boolean>(true) { @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - return tx.putAllAsync(ctx, F.t(key, val), false, ctx.noValArray()).get().success(); + return tx.putAsync(ctx, key, val, false, ctx.noValArray()).get().success(); } @Override public String toString() { @@ -2358,7 +2356,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V IgniteInternalFuture<Boolean> fut = asyncOp(new AsyncOp<Boolean>() { @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) { - return tx.putAllAsync(ctx, F.t(key, val), false, ctx.noValArray()).chain( + return tx.putAsync(ctx, key, val, false, ctx.noValArray()).chain( (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG); } @@ -2384,7 +2382,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return syncOp(new SyncOp<V>(true) { @Override public V op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - return (V)tx.putAllAsync(ctx, F.t(key, val), true, ctx.hasValArray()).get().value(); + return (V)tx.putAsync(ctx, key, val, true, ctx.hasValArray()).get().value(); } @Override public String toString() { @@ -2408,7 +2406,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V IgniteInternalFuture<V> fut = asyncOp(new AsyncOp<V>() { @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) { - return tx.putAllAsync(ctx, F.t(key, val), true, ctx.hasValArray()).chain( + return tx.putAsync(ctx, key, val, true, ctx.hasValArray()).chain( (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>)RET2VAL); } @@ -2434,7 +2432,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return syncOp(new SyncOp<Boolean>(true) { @Override public Boolean op(IgniteTxLocalAdapter tx) throws IgniteCheckedException { - return tx.putAllAsync(ctx, F.t(key, val), false, ctx.hasValArray()).get().success(); + return tx.putAsync(ctx, key, val, false, ctx.hasValArray()).get().success(); } @Override public String toString() { @@ -2454,7 +2452,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return asyncOp(new AsyncOp<Boolean>() { @Override public IgniteInternalFuture<Boolean> op(IgniteTxLocalAdapter tx) { - return tx.putAllAsync(ctx, F.t(key, val), false, ctx.hasValArray()).chain( + return tx.putAsync(ctx, key, val, false, ctx.hasValArray()).chain( (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>) RET2FLAG); } @@ -2481,7 +2479,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (ctx.deploymentEnabled()) ctx.deploy().registerClass(oldVal); - return tx.putAllAsync(ctx, F.t(key, newVal), false, ctx.equalsValArray(oldVal)).get() + return tx.putAsync(ctx, key, newVal, false, ctx.equalsValArray(oldVal)).get() .success(); } @@ -2518,7 +2516,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } } - return tx.putAllAsync(ctx, F.t(key, newVal), false, ctx.equalsValArray(oldVal)).chain( + return tx.putAsync(ctx, key, newVal, false, ctx.equalsValArray(oldVal)).chain( (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG); } @@ -2883,8 +2881,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V if (ctx.deploymentEnabled()) ctx.deploy().registerClass(oldVal); - return tx.putAllAsync(ctx, - F.t(key, newVal), + return tx.putAsync(ctx, + key, + newVal, true, ctx.equalsValArray(oldVal)).get(); } @@ -2945,8 +2944,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return new GridFinishedFuture<>(e); } - return (IgniteInternalFuture)tx.putAllAsync(ctx, - F.t(key, newVal), + return (IgniteInternalFuture)tx.putAsync(ctx, + key, + newVal, true, ctx.equalsValArray(oldVal)); } http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index df9f5c4..0786a50 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -74,6 +74,7 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_EXPIRED; import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT; import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ; import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED; +import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE; import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE; /** @@ -4014,7 +4015,15 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme IgniteInternalTx tx = cctx.tm().localTxx(); - return tx == null || !tx.removed(txKey()); + if (tx != null) { + IgniteTxEntry e = tx.entry(txKey()); + + boolean rmvd = e != null && e.op() == DELETE; + + return !rmvd; + } + + return true; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index b37742c..4293b90 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -582,12 +582,7 @@ public class GridCacheSharedContext<K, V> { * @throws IgniteCheckedException If failed. */ public void endTx(IgniteInternalTx tx) throws IgniteCheckedException { - Collection<Integer> cacheIds = tx.activeCacheIds(); - - if (!cacheIds.isEmpty()) { - for (Integer cacheId : cacheIds) - cacheContext(cacheId).cache().awaitLastFut(); - } + tx.txState().awaitLastFut(this); tx.close(); } @@ -596,22 +591,17 @@ public class GridCacheSharedContext<K, V> { * @param tx Transaction to commit. * @return Commit future. */ + @SuppressWarnings("unchecked") public IgniteInternalFuture<IgniteInternalTx> commitTxAsync(IgniteInternalTx tx) { - Collection<Integer> cacheIds = tx.activeCacheIds(); - - if (cacheIds.isEmpty()) - return tx.commitAsync(); - else if (cacheIds.size() == 1) { - int cacheId = F.first(cacheIds); + GridCacheContext ctx = tx.txState().singleCacheContext(this); - return cacheContext(cacheId).cache().commitTxAsync(tx); - } - else { - for (Integer cacheId : cacheIds) - cacheContext(cacheId).cache().awaitLastFut(); + if (ctx == null) { + tx.txState().awaitLastFut(this); return tx.commitAsync(); } + else + return ctx.cache().commitTxAsync(tx); } /** @@ -620,12 +610,7 @@ public class GridCacheSharedContext<K, V> { * @return Rollback future. */ public IgniteInternalFuture rollbackTxAsync(IgniteInternalTx tx) throws IgniteCheckedException { - Collection<Integer> cacheIds = tx.activeCacheIds(); - - if (!cacheIds.isEmpty()) { - for (Integer cacheId : cacheIds) - cacheContext(cacheId).cache().awaitLastFut(); - } + tx.txState().awaitLastFut(this); return tx.rollbackAsync(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java index 2d2d935..8c9f181 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java @@ -24,9 +24,7 @@ import java.io.ObjectOutput; import java.util.Collection; import java.util.Iterator; import java.util.LinkedHashSet; -import java.util.UUID; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -59,15 +57,9 @@ public class GridDistributedTxMapping implements Externalizable { /** DHT version. */ private GridCacheVersion dhtVer; - /** Copy on remove flag. */ - private boolean readOnly; - /** {@code True} if this is last mapping for node. */ private boolean last; - /** IDs of backup nodes receiving last prepare request during this mapping. */ - private Collection<UUID> lastBackups; - /** {@code True} if mapping is for near caches, {@code false} otherwise. */ private boolean near; @@ -91,20 +83,6 @@ public class GridDistributedTxMapping implements Externalizable { } /** - * @return IDs of backup nodes receiving last prepare request during this mapping. - */ - @Nullable public Collection<UUID> lastBackups() { - return lastBackups; - } - - /** - * @param lastBackups IDs of backup nodes receiving last prepare request during this mapping. - */ - public void lastBackups(@Nullable Collection<UUID> lastBackups) { - this.lastBackups = lastBackups; - } - - /** * @return {@code True} if this is last mapping for node. */ public boolean last() { @@ -161,17 +139,6 @@ public class GridDistributedTxMapping implements Externalizable { } /** - * @param entries Mapped entries. - * @param readOnly Flag indicating that passed in collection is read-only. - */ - public void entries(Collection<IgniteTxEntry> entries, boolean readOnly) { - this.entries = entries; - - // Set copy on remove flag as passed in collection is unmodifiable. - this.readOnly = true; - } - - /** * @return {@code True} if lock is explicit. */ public boolean explicitLock() { @@ -221,8 +188,6 @@ public class GridDistributedTxMapping implements Externalizable { * @param entry Adds entry. */ public void add(IgniteTxEntry entry) { - ensureModifiable(); - entries.add(entry); } @@ -231,48 +196,16 @@ public class GridDistributedTxMapping implements Externalizable { * @return {@code True} if entry was removed. */ public boolean removeEntry(IgniteTxEntry entry) { - ensureModifiable(); - return entries.remove(entry); } /** - * @param parts Evicts partitions from mapping. - */ - public void evictPartitions(@Nullable int[] parts) { - if (!F.isEmpty(parts)) { - ensureModifiable(); - - evictPartitions(parts, entries); - } - } - - /** - * @param parts Partitions. - * @param c Collection. - */ - private void evictPartitions(int[] parts, Collection<IgniteTxEntry> c) { - assert parts != null; - - for (Iterator<IgniteTxEntry> it = c.iterator(); it.hasNext();) { - IgniteTxEntry e = it.next(); - - GridCacheEntryEx cached = e.cached(); - - if (U.containsIntArray(parts, cached.partition())) - it.remove(); - } - } - - /** * @param keys Keys to evict readers for. */ public void evictReaders(@Nullable Collection<IgniteTxKey> keys) { if (keys == null || keys.isEmpty()) return; - ensureModifiable(); - evictReaders(keys, entries); } @@ -293,17 +226,6 @@ public class GridDistributedTxMapping implements Externalizable { } /** - * Copies collection of entries if it is read-only. - */ - private void ensureModifiable() { - if (readOnly) { - entries = new LinkedHashSet<>(entries); - - readOnly = false; - } - } - - /** * Whether empty or not. * * @return Empty or not. http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index 93303c8..0d49584 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -45,6 +45,8 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxRemoteEx; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxRemoteState; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxState; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException; @@ -85,18 +87,6 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter /** */ private static final long serialVersionUID = 0L; - /** Read set. */ - @GridToStringInclude - protected Map<IgniteTxKey, IgniteTxEntry> readMap; - - /** Write map. */ - @GridToStringInclude - protected Map<IgniteTxKey, IgniteTxEntry> writeMap; - - /** Remote thread ID. */ - @GridToStringInclude - private long rmtThreadId; - /** Explicit versions. */ @GridToStringInclude private List<GridCacheVersion> explicitVers; @@ -109,6 +99,10 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter @GridToStringInclude private AtomicBoolean commitAllowed = new AtomicBoolean(false); + /** */ + @GridToStringInclude + protected IgniteTxRemoteState txState; + /** * Empty constructor required for {@link Externalizable}. */ @@ -119,7 +113,6 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter /** * @param ctx Cache registry. * @param nodeId Node ID. - * @param rmtThreadId Remote thread ID. * @param xidVer XID version. * @param commitVer Commit version. * @param sys System flag. @@ -135,7 +128,6 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter public GridDistributedTxRemoteAdapter( GridCacheSharedContext<?, ?> ctx, UUID nodeId, - long rmtThreadId, GridCacheVersion xidVer, GridCacheVersion commitVer, boolean sys, @@ -163,7 +155,6 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter subjId, taskNameHash); - this.rmtThreadId = rmtThreadId; this.invalidate = invalidate; commitVersion(commitVer); @@ -173,6 +164,11 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter } /** {@inheritDoc} */ + @Override public IgniteTxState txState() { + return txState; + } + + /** {@inheritDoc} */ @Override public UUID eventNodeId() { return nodeId; } @@ -188,11 +184,6 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter } /** {@inheritDoc} */ - @Override public Collection<Integer> activeCacheIds() { - return Collections.emptyList(); - } - - /** {@inheritDoc} */ @Override public boolean activeCachesDeploymentEnabled() { return false; } @@ -201,14 +192,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter * @return Checks if transaction has no entries. */ @Override public boolean empty() { - return readMap.isEmpty() && writeMap.isEmpty(); - } - - /** {@inheritDoc} */ - @Override public boolean removed(IgniteTxKey key) { - IgniteTxEntry e = writeMap.get(key); - - return e != null && e.op() == DELETE; + return txState.empty(); } /** {@inheritDoc} */ @@ -218,12 +202,12 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter /** {@inheritDoc} */ @Override public Map<IgniteTxKey, IgniteTxEntry> writeMap() { - return writeMap; + return txState.writeMap(); } /** {@inheritDoc} */ @Override public Map<IgniteTxKey, IgniteTxEntry> readMap() { - return readMap; + return txState.readMap(); } /** {@inheritDoc} */ @@ -245,12 +229,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter /** {@inheritDoc} */ @Override public IgniteTxEntry entry(IgniteTxKey key) { - IgniteTxEntry e = writeMap == null ? null : writeMap.get(key); - - if (e == null) - e = readMap == null ? null : readMap.get(key); - - return e; + return txState.entry(key); } /** @@ -259,8 +238,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter * @param key key to be removed. */ public void clearEntry(IgniteTxKey key) { - readMap.remove(key); - writeMap.remove(key); + txState.clearEntry(key); } /** @@ -268,13 +246,19 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter * @param committedVers Committed versions. * @param rolledbackVers Rolled back versions. */ - @Override public void doneRemote(GridCacheVersion baseVer, Collection<GridCacheVersion> committedVers, - Collection<GridCacheVersion> rolledbackVers, Collection<GridCacheVersion> pendingVers) { + @Override public void doneRemote(GridCacheVersion baseVer, + Collection<GridCacheVersion> committedVers, + Collection<GridCacheVersion> rolledbackVers, + Collection<GridCacheVersion> pendingVers) { + Map<IgniteTxKey, IgniteTxEntry> readMap = txState.readMap(); + if (readMap != null && !readMap.isEmpty()) { for (IgniteTxEntry txEntry : readMap.values()) doneRemote(txEntry, baseVer, committedVers, rolledbackVers, pendingVers); } + Map<IgniteTxKey, IgniteTxEntry> writeMap = txState.writeMap(); + if (writeMap != null && !writeMap.isEmpty()) { for (IgniteTxEntry txEntry : writeMap.values()) doneRemote(txEntry, baseVer, committedVers, rolledbackVers, pendingVers); @@ -290,8 +274,10 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter * @param rolledbackVers Rolled back versions relative to base version. * @param pendingVers Pending versions. */ - private void doneRemote(IgniteTxEntry txEntry, GridCacheVersion baseVer, - Collection<GridCacheVersion> committedVers, Collection<GridCacheVersion> rolledbackVers, + private void doneRemote(IgniteTxEntry txEntry, + GridCacheVersion baseVer, + Collection<GridCacheVersion> committedVers, + Collection<GridCacheVersion> rolledbackVers, Collection<GridCacheVersion> pendingVers) { while (true) { GridDistributedCacheEntry entry = (GridDistributedCacheEntry)txEntry.cached(); @@ -337,59 +323,9 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter return started; } - /** - * @return Remote node thread ID. - */ - @Override public long remoteThreadId() { - return rmtThreadId; - } - - /** - * @param e Transaction entry to set. - * @return {@code True} if value was set. - */ - @Override public boolean setWriteValue(IgniteTxEntry e) { - checkInternal(e.txKey()); - - IgniteTxEntry entry = writeMap.get(e.txKey()); - - if (entry == null) { - IgniteTxEntry rmv = readMap.remove(e.txKey()); - - if (rmv != null) { - e.cached(rmv.cached()); - - writeMap.put(e.txKey(), e); - } - // If lock is explicit. - else { - e.cached(e.context().cache().entryEx(e.key())); - - // explicit lock. - writeMap.put(e.txKey(), e); - } - } - else { - // Copy values. - entry.value(e.value(), e.hasWriteValue(), e.hasReadValue()); - entry.entryProcessors(e.entryProcessors()); - entry.op(e.op()); - entry.ttl(e.ttl()); - entry.explicitVersion(e.explicitVersion()); - - // Conflict resolution stuff. - entry.conflictVersion(e.conflictVersion()); - entry.conflictExpireTime(e.conflictExpireTime()); - } - - addExplicit(e); - - return true; - } - /** {@inheritDoc} */ @Override public boolean hasWriteKey(IgniteTxKey key) { - return writeMap.containsKey(key); + return txState.hasWriteKey(key); } /** {@inheritDoc} */ @@ -400,27 +336,27 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter /** {@inheritDoc} */ @Override public Set<IgniteTxKey> readSet() { - return readMap.keySet(); + return txState.readSet(); } /** {@inheritDoc} */ @Override public Set<IgniteTxKey> writeSet() { - return writeMap.keySet(); + return txState.writeSet(); } /** {@inheritDoc} */ @Override public Collection<IgniteTxEntry> allEntries() { - return F.concat(false, writeEntries(), readEntries()); + return txState.allEntries(); } /** {@inheritDoc} */ @Override public Collection<IgniteTxEntry> writeEntries() { - return writeMap.values(); + return txState.writeEntries(); } /** {@inheritDoc} */ @Override public Collection<IgniteTxEntry> readEntries() { - return readMap.values(); + return txState.readEntries(); } /** @@ -459,7 +395,7 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter @SuppressWarnings({"CatchGenericClass"}) private void commitIfLocked() throws IgniteCheckedException { if (state() == COMMITTING) { - for (IgniteTxEntry txEntry : writeMap.values()) { + for (IgniteTxEntry txEntry : writeEntries()) { assert txEntry != null : "Missing transaction entry for tx: " + this; while (true) { @@ -494,6 +430,8 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter if (commitAllowed.compareAndSet(false, true)) { IgniteCheckedException err = null; + Map<IgniteTxKey, IgniteTxEntry> writeMap = txState.writeMap(); + if (!F.isEmpty(writeMap)) { // Register this transaction as completed prior to write-phase to // ensure proper lock ordering for removed entries. http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index 3069afd..fe91e5b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -228,7 +228,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach req.futureId(), nodeId, req.nearXidVersion(), - req.threadId(), req.topologyVersion(), req.version(), /*commitVer*/null, http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index 44f34aa..2bed931 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -223,11 +223,6 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa } /** {@inheritDoc} */ - @Override protected IgniteUuid nearMiniId() { - return nearMiniId; - } - - /** {@inheritDoc} */ @Override public boolean dht() { return true; } @@ -307,8 +302,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa 0, nearMiniId, null, - true, - null); + true); } // For pessimistic mode we don't distribute prepare request. @@ -322,8 +316,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa nearMiniId, Collections.<IgniteTxKey, GridCacheVersion>emptyMap(), true, - needReturnValue(), - null))) + needReturnValue()))) return prepFut.get(); } else @@ -378,7 +371,6 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa * @param nearMiniId Near mini future ID. * @param txNodes Transaction nodes mapping. * @param last {@code True} if this is last prepare request. - * @param lastBackups IDs of backup nodes receiving last prepare request. * @return Future that will be completed when locks are acquired. */ public IgniteInternalFuture<GridNearTxPrepareResponse> prepareAsync( @@ -388,8 +380,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa long msgId, IgniteUuid nearMiniId, Map<UUID, Collection<UUID>> txNodes, - boolean last, - Collection<UUID> lastBackups + boolean last ) { // In optimistic mode prepare still can be called explicitly from salvageTx. GridDhtTxPrepareFuture fut = prepFut.get(); @@ -404,8 +395,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa nearMiniId, verMap, last, - needReturnValue(), - lastBackups))) { + needReturnValue()))) { GridDhtTxPrepareFuture f = prepFut.get(); assert f.nearMiniId().equals(nearMiniId) : "Wrong near mini id on existing future " + @@ -443,13 +433,15 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa } try { - if (reads != null) + if (reads != null) { for (IgniteTxEntry e : reads) addEntry(msgId, e); + } - if (writes != null) + if (writes != null) { for (IgniteTxEntry e : writes) addEntry(msgId, e); + } userPrepare(); http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 6de8795..70ebf3f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -86,9 +86,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { protected volatile boolean mapped; /** */ - private long dhtThreadId; - - /** */ protected boolean explicitLock; /** Versions of pending locks for entries of this tx. */ @@ -159,7 +156,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { this.explicitLock = explicitLock; threadId = Thread.currentThread().getId(); - dhtThreadId = threadId; } /** @@ -216,11 +212,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { protected abstract IgniteUuid nearFutureId(); /** - * @return Near future mini ID. - */ - protected abstract IgniteUuid nearMiniId(); - - /** * Adds reader to cached entry. * * @param msgId Message ID. @@ -260,13 +251,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { } /** - * @return DHT thread ID. - */ - long dhtThreadId() { - return dhtThreadId; - } - - /** * Map explicit locks. */ protected void mapExplicitLocks() { @@ -355,22 +339,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { } /** - * @param nodeId Node ID. - * @return Mapping. - */ - GridDistributedTxMapping dhtMapping(UUID nodeId) { - return dhtMap.get(nodeId); - } - - /** - * @param nodeId Node ID. - * @return Mapping. - */ - GridDistributedTxMapping nearMapping(UUID nodeId) { - return nearMap.get(nodeId); - } - - /** * @param mappings Mappings to add. */ void addDhtNodeEntryMapping(Map<ClusterNode, List<GridDhtCacheEntry>> mappings) { @@ -385,19 +353,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { } /** - * @param mappings Mappings to add. - */ - public void addDhtMapping(Map<UUID, GridDistributedTxMapping> mappings) { - addMapping0(mappings, dhtMap); - } - - /** - * @param mappings Mappings to add. - */ - public void addNearMapping(Map<UUID, GridDistributedTxMapping> mappings) { - addMapping0(mappings, nearMap); - } - /** * @param nodeId Node ID. * @return {@code True} if mapping was removed. */ @@ -435,7 +390,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { if (log.isDebugEnabled()) log.debug("Removing mapping for entry [nodeId=" + nodeId + ", entry=" + entry + ']'); - IgniteTxEntry txEntry = txMap.get(entry.txKey()); + IgniteTxEntry txEntry = entry(entry.txKey()); if (txEntry == null) return false; @@ -469,7 +424,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { List<GridDhtCacheEntry> entries = mapping.getValue(); for (GridDhtCacheEntry entry : entries) { - IgniteTxEntry txEntry = txMap.get(entry.txKey()); + IgniteTxEntry txEntry = entry(entry.txKey()); if (txEntry != null) { if (m == null) @@ -481,26 +436,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { } } - /** - * @param mappings Mappings to add. - * @param dst Map to add to. - */ - private void addMapping0( - Map<UUID, GridDistributedTxMapping> mappings, - Map<UUID, GridDistributedTxMapping> dst - ) { - for (Map.Entry<UUID, GridDistributedTxMapping> entry : mappings.entrySet()) { - GridDistributedTxMapping targetMapping = dst.get(entry.getKey()); - - if (targetMapping == null) - dst.put(entry.getKey(), entry.getValue()); - else { - for (IgniteTxEntry txEntry : entry.getValue().entries()) - targetMapping.add(txEntry); - } - } - } - /** {@inheritDoc} */ @Override public void addInvalidPartition(GridCacheContext ctx, int part) { assert false : "DHT transaction encountered invalid partition [part=" + part + ", tx=" + this + ']'; @@ -529,7 +464,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { GridDhtCacheAdapter dhtCache = cacheCtx.isNear() ? cacheCtx.near().dht() : cacheCtx.dht(); try { - IgniteTxEntry existing = txMap.get(e.txKey()); + IgniteTxEntry existing = entry(e.txKey()); if (existing != null) { // Must keep NOOP operation if received READ because it means that the lock was sent to a backup node. @@ -569,7 +504,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { existing.explicitVersion(dhtVer); } - txMap.put(existing.txKey(), existing); + txState.addEntry(existing); if (log.isDebugEnabled()) log.debug("Added entry to transaction: " + existing); @@ -705,7 +640,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { passedKeys, read, needRetVal, - skipped, accessTtl, null, skipStore); @@ -723,7 +657,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { * @param passedKeys Passed keys. * @param read {@code True} if read. * @param needRetVal Return value flag. - * @param skipped Skipped keys. * @param accessTtl TTL for read operation. * @param filter Entry write filter. * @param skipStore Skip store flag. @@ -735,13 +668,11 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { final Collection<KeyCacheObject> passedKeys, final boolean read, final boolean needRetVal, - final Set<KeyCacheObject> skipped, final long accessTtl, @Nullable final CacheEntryPredicate[] filter, boolean skipStore) { if (log.isDebugEnabled()) - log.debug("Before acquiring transaction lock on keys [passedKeys=" + passedKeys + ", skipped=" + - skipped + ']'); + log.debug("Before acquiring transaction lock on keys [keys=" + passedKeys + ']'); if (passedKeys.isEmpty()) return new GridFinishedFuture<>(ret); @@ -768,7 +699,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { postLockWrite(cacheCtx, passedKeys, - skipped, ret, /*remove*/false, /*retval*/false, http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java index 55cbe96..9ec35b6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxMapping.java @@ -17,18 +17,15 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; -import java.util.ArrayList; import java.util.Collection; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.UUID; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; import org.apache.ignite.internal.util.GridLeanMap; +import org.apache.ignite.internal.util.GridLeanSet; import org.apache.ignite.internal.util.typedef.F; -import org.jetbrains.annotations.Nullable; +import org.apache.ignite.internal.util.typedef.internal.U; /** * DHT transaction mapping. @@ -37,12 +34,6 @@ public class GridDhtTxMapping { /** Transaction nodes mapping (primary node -> related backup nodes). */ private final Map<UUID, Collection<UUID>> txNodes = new GridLeanMap<>(); - /** */ - private final List<TxMapping> mappings = new ArrayList<>(); - - /** */ - private TxMapping last; - /** * Adds information about next mapping. * @@ -50,24 +41,26 @@ public class GridDhtTxMapping { */ @SuppressWarnings("ConstantConditions") public void addMapping(List<ClusterNode> nodes) { - ClusterNode primary = F.first(nodes); + assert !F.isEmpty(nodes) : nodes; - Collection<ClusterNode> backups = F.view(nodes, F.notEqualTo(primary)); + ClusterNode primary = nodes.get(0); - if (last == null || !last.primary.equals(primary.id())) { - last = new TxMapping(primary, backups); + int size = nodes.size(); - mappings.add(last); - } - else - last.add(backups); + if (size > 1) { + Collection<UUID> backups = txNodes.get(primary.id()); - Collection<UUID> storedBackups = txNodes.get(last.primary); + if (backups == null) { + backups = U.newHashSet(size - 1); - if (storedBackups == null) - txNodes.put(last.primary, storedBackups = new HashSet<>()); + txNodes.put(primary.id(), backups); + } - storedBackups.addAll(last.backups); + for (int i = 1; i < size; i++) + backups.add(nodes.get(i).id()); + } + else + txNodes.put(primary.id(), new GridLeanSet<UUID>()); } /** @@ -76,99 +69,4 @@ public class GridDhtTxMapping { public Map<UUID, Collection<UUID>> transactionNodes() { return txNodes; } - - /** - * For each mapping sets flags indicating if mapping is last for node. - * - * @param mappings Mappings. - */ - public void initLast(Collection<GridDistributedTxMapping> mappings) { - assert this.mappings.size() == mappings.size(); - - int idx = 0; - - for (GridDistributedTxMapping map : mappings) { - TxMapping mapping = this.mappings.get(idx); - - map.lastBackups(lastBackups(mapping, idx)); - - boolean last = true; - - for (int i = idx + 1; i < this.mappings.size(); i++) { - TxMapping nextMap = this.mappings.get(i); - - if (nextMap.primary.equals(mapping.primary)) { - last = false; - - break; - } - } - - map.last(last); - - idx++; - } - } - - /** - * @param mapping Mapping. - * @param idx Mapping index. - * @return IDs of backup nodes receiving last prepare request during this mapping. - */ - @Nullable private Collection<UUID> lastBackups(TxMapping mapping, int idx) { - Collection<UUID> res = null; - - for (UUID backup : mapping.backups) { - boolean foundNext = false; - - for (int i = idx + 1; i < mappings.size(); i++) { - TxMapping nextMap = mappings.get(i); - - if (nextMap.primary.equals(mapping.primary) && nextMap.backups.contains(backup)) { - foundNext = true; - - break; - } - } - - if (!foundNext) { - if (res == null) - res = new ArrayList<>(mapping.backups.size()); - - res.add(backup); - } - } - - return res; - } - - /** - */ - private static class TxMapping { - /** */ - private final UUID primary; - - /** */ - private final Set<UUID> backups; - - /** - * @param primary Primary node. - * @param backups Backup nodes. - */ - private TxMapping(ClusterNode primary, Iterable<ClusterNode> backups) { - this.primary = primary.id(); - - this.backups = new HashSet<>(); - - add(backups); - } - - /** - * @param backups Backup nodes. - */ - private void add(Iterable<ClusterNode> backups) { - for (ClusterNode n : backups) - this.backups.add(n.id()); - } - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 1d6f633..a67950d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -165,9 +165,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter /** {@code True} if this is last prepare operation for node. */ private boolean last; - /** IDs of backup nodes receiving last prepare request during this prepare. */ - private Collection<UUID> lastBackups; - /** Needs return value flag. */ private boolean retVal; @@ -197,7 +194,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter * @param dhtVerMap DHT versions map. * @param last {@code True} if this is last prepare operation for node. * @param retVal Return value flag. - * @param lastBackups IDs of backup nodes receiving last prepare request during this prepare. */ public GridDhtTxPrepareFuture( GridCacheSharedContext cctx, @@ -205,8 +201,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter IgniteUuid nearMiniId, Map<IgniteTxKey, GridCacheVersion> dhtVerMap, boolean last, - boolean retVal, - Collection<UUID> lastBackups + boolean retVal ) { super(REDUCER); @@ -214,7 +209,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter this.tx = tx; this.dhtVerMap = dhtVerMap; this.last = last; - this.lastBackups = lastBackups; futId = IgniteUuid.randomUuid(); @@ -864,14 +858,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter } /** - * @param backupId Backup node ID. - * @return {@code True} if backup node receives last prepare request for this transaction. - */ - private boolean lastBackup(UUID backupId) { - return lastBackups != null && lastBackups.contains(backupId); - } - - /** * Checks if this transaction needs previous value for the given tx entry. Will use passed in map to store * required key or will create new map if passed in map is {@code null}. * @@ -1022,18 +1008,15 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter tx.writeVersion(cctx.versions().next(tx.topologyVersion())); { - Map<UUID, GridDistributedTxMapping> futDhtMap = new HashMap<>(); - Map<UUID, GridDistributedTxMapping> futNearMap = new HashMap<>(); - // Assign keys to primary nodes. if (!F.isEmpty(writes)) { for (IgniteTxEntry write : writes) - map(tx.entry(write.txKey()), futDhtMap, futNearMap); + map(tx.entry(write.txKey())); } if (!F.isEmpty(reads)) { for (IgniteTxEntry read : reads) - map(tx.entry(read.txKey()), futDhtMap, futNearMap); + map(tx.entry(read.txKey())); } } @@ -1225,14 +1208,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter /** * @param entry Transaction entry. - * @param futDhtMap DHT mapping. - * @param futNearMap Near mapping. */ - private void map( - IgniteTxEntry entry, - Map<UUID, GridDistributedTxMapping> futDhtMap, - Map<UUID, GridDistributedTxMapping> futNearMap - ) { + private void map(IgniteTxEntry entry) { if (entry.cached().isLocal()) return; @@ -1258,26 +1235,25 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter log.debug("Mapping entry to DHT nodes [nodes=" + U.toShortString(dhtNodes) + ", entry=" + entry + ']'); - Collection<UUID> readers = cached.readers(); + // Exclude local node. + map(entry, F.view(dhtNodes, F.remoteNodes(cctx.localNodeId())), dhtMap); - Collection<ClusterNode> nearNodes = null; + Collection<UUID> readers = cached.readers(); if (!F.isEmpty(readers)) { - nearNodes = cctx.discovery().nodes(readers, F0.not(F.idForNodeId(tx.nearNodeId()))); + Collection<ClusterNode> nearNodes = + cctx.discovery().nodes(readers, F0.not(F.idForNodeId(tx.nearNodeId()))); if (log.isDebugEnabled()) log.debug("Mapping entry to near nodes [nodes=" + U.toShortString(nearNodes) + ", entry=" + entry + ']'); + + // Exclude DHT nodes. + map(entry, F.view(nearNodes, F0.notIn(dhtNodes)), nearMap); } else if (log.isDebugEnabled()) log.debug("Entry has no near readers: " + entry); - // Exclude local node. - map(entry, F.view(dhtNodes, F.remoteNodes(cctx.localNodeId())), dhtMap, futDhtMap); - - // Exclude DHT nodes. - map(entry, F.view(nearNodes, F0.notIn(dhtNodes)), nearMap, futNearMap); - break; } catch (GridCacheEntryRemovedException ignore) { @@ -1292,13 +1268,11 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter * @param entry Entry. * @param nodes Nodes. * @param globalMap Map. - * @param locMap Exclude map. */ private void map( IgniteTxEntry entry, Iterable<ClusterNode> nodes, - Map<UUID, GridDistributedTxMapping> globalMap, - Map<UUID, GridDistributedTxMapping> locMap + Map<UUID, GridDistributedTxMapping> globalMap ) { if (nodes != null) { for (ClusterNode n : nodes) { @@ -1321,13 +1295,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter globalMap.put(n.id(), global = new GridDistributedTxMapping(n)); global.add(entry); - - GridDistributedTxMapping loc = locMap.get(n.id()); - - if (loc == null) - locMap.put(n.id(), loc = new GridDistributedTxMapping(n)); - - loc.add(entry); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java index e268a88..0cbe10f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java @@ -36,6 +36,8 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxRemoteAdapter; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxRemoteSingleStateImpl; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxRemoteStateImpl; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringBuilder; import org.apache.ignite.internal.util.typedef.T2; @@ -77,7 +79,6 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { * @param nearNodeId Near node ID. * @param rmtFutId Remote future ID. * @param nodeId Node ID. - * @param rmtThreadId Remote thread ID. * @param topVer Topology version. * @param xidVer XID version. * @param commitVer Commit version. @@ -96,7 +97,6 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { UUID nearNodeId, IgniteUuid rmtFutId, UUID nodeId, - long rmtThreadId, AffinityTopologyVersion topVer, GridCacheVersion xidVer, GridCacheVersion commitVer, @@ -110,12 +110,12 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { GridCacheVersion nearXidVer, Map<UUID, Collection<UUID>> txNodes, @Nullable UUID subjId, - int taskNameHash + int taskNameHash, + boolean single ) { super( ctx, nodeId, - rmtThreadId, xidVer, commitVer, sys, @@ -137,9 +137,10 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { this.nearXidVer = nearXidVer; this.txNodes = txNodes; - readMap = Collections.emptyMap(); - - writeMap = new ConcurrentLinkedHashMap<>(U.capacity(txSize), 0.75f, 1); + txState = single ? new IgniteTxRemoteSingleStateImpl() : + new IgniteTxRemoteStateImpl( + Collections.<IgniteTxKey, IgniteTxEntry>emptyMap(), + new ConcurrentLinkedHashMap<IgniteTxKey, IgniteTxEntry>(U.capacity(txSize), 0.75f, 1)); topologyVersion(topVer); } @@ -151,7 +152,6 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { * @param rmtFutId Remote future ID. * @param nodeId Node ID. * @param nearXidVer Near transaction ID. - * @param rmtThreadId Remote thread ID. * @param topVer Topology version. * @param xidVer XID version. * @param commitVer Commit version. @@ -169,7 +169,6 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { IgniteUuid rmtFutId, UUID nodeId, GridCacheVersion nearXidVer, - long rmtThreadId, AffinityTopologyVersion topVer, GridCacheVersion xidVer, GridCacheVersion commitVer, @@ -186,7 +185,6 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { super( ctx, nodeId, - rmtThreadId, xidVer, commitVer, sys, @@ -207,8 +205,9 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { this.nearNodeId = nearNodeId; this.rmtFutId = rmtFutId; - readMap = Collections.emptyMap(); - writeMap = new ConcurrentLinkedHashMap<>(U.capacity(txSize), 0.75f, 1); + txState = new IgniteTxRemoteStateImpl( + Collections.<IgniteTxKey, IgniteTxEntry>emptyMap(), + new ConcurrentLinkedHashMap<IgniteTxKey, IgniteTxEntry>(U.capacity(txSize), 0.75f, 1)); topologyVersion(topVer); } @@ -280,6 +279,8 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { @Override public void addInvalidPartition(GridCacheContext cacheCtx, int part) { super.addInvalidPartition(cacheCtx, part); + Map<IgniteTxKey, IgniteTxEntry> writeMap = txState.writeMap(); + for (Iterator<IgniteTxEntry> it = writeMap.values().iterator(); it.hasNext();) { IgniteTxEntry e = it.next(); @@ -312,7 +313,7 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { // Initialize cache entry. entry.cached(cached); - writeMap.put(entry.txKey(), entry); + txState.addWriteEntry(entry.txKey(), entry); addExplicit(entry); } @@ -356,7 +357,7 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { txEntry.entryProcessors(entryProcessors); - writeMap.put(key, txEntry); + txState.addWriteEntry(key, txEntry); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java index 5488bb1..29774a5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@ -29,7 +29,6 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.cluster.ClusterTopologyException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; -import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; @@ -57,7 +56,6 @@ import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.P1; import org.apache.ignite.internal.util.typedef.X; -import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; @@ -326,19 +324,6 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim txMapping = new GridDhtTxMapping(); - if (!F.isEmpty(reads) || !F.isEmpty(writes)) { - for (int cacheId : tx.activeCacheIds()) { - GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId); - - if (CU.affinityNodes(cacheCtx, topVer).isEmpty()) { - onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all " + - "partition nodes left the grid): " + cacheCtx.name())); - - return; - } - } - } - Map<IgniteBiTuple<ClusterNode, Boolean>, GridDistributedTxMapping> mappings = new HashMap<>(); for (IgniteTxEntry write : writes) @@ -437,7 +422,6 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim m.near(), txMapping.transactionNodes(), m.last(), - m.lastBackups(), tx.onePhaseCommit(), tx.needReturnValue() && tx.implicit(), tx.implicitSingle(), http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index 0002180..791d2f3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -19,7 +19,9 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import java.util.ArrayDeque; import java.util.Collection; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Queue; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; @@ -28,7 +30,6 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.cluster.ClusterTopologyException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; -import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; @@ -50,7 +51,6 @@ import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; -import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteClosure; @@ -267,7 +267,12 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa return; } - prepare(tx.writeEntries(), topLocked); + IgniteTxEntry singleWrite = tx.singleWrite(); + + if (singleWrite != null) + prepareSingle(singleWrite, topLocked); + else + prepare(tx.writeEntries(), topLocked); markInitialized(); } @@ -280,6 +285,46 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa } /** + * @param write Write. + * @param topLocked {@code True} if thread already acquired lock preventing topology change. + */ + private void prepareSingle(IgniteTxEntry write, boolean topLocked) { + AffinityTopologyVersion topVer = tx.topologyVersion(); + + assert topVer.topologyVersion() > 0; + + txMapping = new GridDhtTxMapping(); + + GridDistributedTxMapping mapping = map(write, topVer, null, topLocked); + + if (mapping.node().isLocal()) { + if (write.context().isNear()) + tx.nearLocallyMapped(true); + else if (write.context().isColocated()) + tx.colocatedLocallyMapped(true); + } + + if (isDone()) { + if (log.isDebugEnabled()) + log.debug("Abandoning (re)map because future is done: " + this); + + return; + } + + tx.addSingleEntryMapping(mapping, write); + + cctx.mvcc().recheckPendingLocks(); + + mapping.last(true); + + tx.transactionNodes(txMapping.transactionNodes()); + + checkOnePhase(); + + proceedPrepare(mapping, null); + } + + /** * @param writes Write entries. * @param topLocked {@code True} if thread already acquired lock preventing topology change. * @throws IgniteCheckedException If failed. @@ -294,30 +339,26 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa txMapping = new GridDhtTxMapping(); - Queue<GridDistributedTxMapping> mappings = new ArrayDeque<>(); - - if (!F.isEmpty(writes)) { - for (int cacheId : tx.activeCacheIds()) { - GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId); - - if (CU.affinityNodes(cacheCtx, topVer).isEmpty()) { - onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all " + - "partition nodes left the grid): " + cacheCtx.name())); - - return; - } - } - } + Map<UUID, GridDistributedTxMapping> map = new HashMap<>(); // Assign keys to primary nodes. GridDistributedTxMapping cur = null; + Queue<GridDistributedTxMapping> mappings = new ArrayDeque<>(); + for (IgniteTxEntry write : writes) { GridDistributedTxMapping updated = map(write, topVer, cur, topLocked); if (cur != updated) { mappings.offer(updated); + updated.last(true); + + GridDistributedTxMapping prev = map.put(updated.node().id(), updated); + + if (prev != null) + prev.last(false); + if (updated.node().isLocal()) { if (write.context().isNear()) tx.nearLocallyMapped(true); @@ -340,8 +381,6 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa cctx.mvcc().recheckPendingLocks(); - txMapping.initLast(mappings); - tx.transactionNodes(txMapping.transactionNodes()); checkOnePhase(); @@ -355,14 +394,24 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa * @param mappings Queue of mappings. */ private void proceedPrepare(final Queue<GridDistributedTxMapping> mappings) { - if (isDone()) - return; - final GridDistributedTxMapping m = mappings.poll(); if (m == null) return; + proceedPrepare(m, mappings); + } + + /** + * Continues prepare after previous mapping successfully finished. + * + * @param m Mapping. + * @param mappings Queue of mappings. + */ + private void proceedPrepare(GridDistributedTxMapping m, @Nullable final Queue<GridDistributedTxMapping> mappings) { + if (isDone()) + return; + assert !m.empty(); final ClusterNode n = m.node(); @@ -376,7 +425,6 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa m.near(), txMapping.transactionNodes(), m.last(), - m.lastBackups(), tx.onePhaseCommit(), tx.needReturnValue() && tx.implicit(), tx.implicitSingle(), @@ -457,7 +505,14 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa ) { GridCacheContext cacheCtx = entry.context(); - List<ClusterNode> nodes = cacheCtx.affinity().nodes(entry.key(), topVer); + List<ClusterNode> nodes; + + GridCacheEntryEx cached0 = entry.cached(); + + if (cached0.isDht()) + nodes = cacheCtx.affinity().nodes(cached0.partition(), topVer); + else + nodes = cacheCtx.affinity().nodes(entry.key(), topVer); txMapping.addMapping(nodes); http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java index fd9183e..6b7244a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java @@ -20,13 +20,11 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.util.lang.GridPlainRunnable; import org.apache.ignite.internal.util.typedef.CI1; -import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; /** @@ -75,56 +73,14 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT * @return Topology ready future. */ protected final GridDhtTopologyFuture topologyReadLock() { - if (tx.activeCacheIds().isEmpty()) - return cctx.exchange().lastTopologyFuture(); - - GridCacheContext<?, ?> nonLocCtx = null; - - for (int cacheId : tx.activeCacheIds()) { - GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId); - - if (!cacheCtx.isLocal()) { - nonLocCtx = cacheCtx; - - break; - } - } - - if (nonLocCtx == null) - return cctx.exchange().lastTopologyFuture(); - - nonLocCtx.topology().readLock(); - - if (nonLocCtx.topology().stopping()) { - onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + - nonLocCtx.name())); - - return null; - } - - return nonLocCtx.topology().topologyVersionFuture(); + return tx.txState().topologyReadLock(cctx, this); } /** * Releases topology read lock. */ protected final void topologyReadUnlock() { - if (!tx.activeCacheIds().isEmpty()) { - GridCacheContext<?, ?> nonLocCtx = null; - - for (int cacheId : tx.activeCacheIds()) { - GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId); - - if (!cacheCtx.isLocal()) { - nonLocCtx = cacheCtx; - - break; - } - } - - if (nonLocCtx != null) - nonLocCtx.topology().readUnlock(); - } + tx.txState().topologyReadUnlock(cctx); } /** @@ -160,28 +116,10 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT } if (topVer != null) { - StringBuilder invalidCaches = null; - - for (Integer cacheId : tx.activeCacheIds()) { - GridCacheContext ctx = cctx.cacheContext(cacheId); - - assert ctx != null : cacheId; - - Throwable err = topFut.validateCache(ctx); - - if (err != null) { - if (invalidCaches != null) - invalidCaches.append(", "); - else - invalidCaches = new StringBuilder(); - - invalidCaches.append(U.maskName(ctx.name())); - } - } + IgniteCheckedException err = tx.txState().validateTopology(cctx, topFut); - if (invalidCaches != null) { - onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " + - invalidCaches.toString())); + if (err != null) { + onDone(err); return; } http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java index 11d31b2..1554a62 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java @@ -200,7 +200,6 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA m.near(), txMapping.transactionNodes(), true, - txMapping.transactionNodes().get(node.id()), tx.onePhaseCommit(), tx.needReturnValue() && tx.implicit(), tx.implicitSingle(), http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java index 5ab85b2..e8546ef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java @@ -315,7 +315,6 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> nodeId, req.nearNodeId(), req.nearXidVersion(), - req.threadId(), req.version(), null, ctx.systemTx(), http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index a9dbda2..31aa8c3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -88,7 +88,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu private boolean commit; /** Node mappings. */ - private Map<UUID, GridDistributedTxMapping> mappings; + private IgniteTxMappings mappings; /** Trackable flag. */ private boolean trackable = true; @@ -347,8 +347,16 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu try { if (tx.finish(commit) || (!commit && tx.state() == UNKNOWN)) { - if ((tx.onePhaseCommit() && needFinishOnePhase()) || (!tx.onePhaseCommit() && mappings != null)) - finish(mappings.values()); + if ((tx.onePhaseCommit() && needFinishOnePhase()) || (!tx.onePhaseCommit() && mappings != null)) { + if (mappings.single()) { + GridDistributedTxMapping mapping = mappings.singleMapping(); + + if (mapping != null) + finish(mapping); + } + else + finish(mappings.mappings()); + } markInitialized(); @@ -381,11 +389,10 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu * */ private void checkBackup() { - assert mappings.size() <= 1; + GridDistributedTxMapping mapping = mappings.singleMapping(); - for (Map.Entry<UUID, GridDistributedTxMapping> entry : mappings.entrySet()) { - UUID nodeId = entry.getKey(); - GridDistributedTxMapping mapping = entry.getValue(); + if (mapping != null) { + UUID nodeId = mapping.node().id(); Collection<UUID> backups = tx.transactionNodes().get(nodeId); @@ -482,25 +489,13 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu * */ private boolean needFinishOnePhase() { - if (F.isEmpty(tx.mappings())) + if (tx.mappings().empty()) return false; - assert tx.mappings().size() == 1; - - boolean finish = false; - - for (Integer cacheId : tx.activeCacheIds()) { - GridCacheContext<K, V> cacheCtx = cctx.cacheContext(cacheId); - - if (cacheCtx.isNear()) { - finish = true; - - break; - } - } + boolean finish = tx.txState().hasNearCache(cctx); if (finish) { - GridDistributedTxMapping mapping = F.first(tx.mappings().values()); + GridDistributedTxMapping mapping = tx.mappings().singleMapping(); if (FINISH_NEAR_ONE_PHASE_SINCE.compareTo(mapping.node().version()) > 0) finish = false; @@ -520,18 +515,16 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu finishOnePhaseCalled = true; - // No need to send messages as transaction was already committed on remote node. - // Finish local mapping only as we need send commit message to backups. - for (GridDistributedTxMapping m : mappings.values()) { - if (m.node().isLocal()) { - IgniteInternalFuture<IgniteInternalTx> fut = cctx.tm().txHandler().finishColocatedLocal(commit, tx); + GridDistributedTxMapping locMapping = mappings.localMapping(); - // Add new future. - if (fut != null) - add(fut); + if (locMapping != null) { + // No need to send messages as transaction was already committed on remote node. + // Finish local mapping only as we need send commit message to backups. + IgniteInternalFuture<IgniteInternalTx> fut = cctx.tm().txHandler().finishColocatedLocal(commit, tx); - break; - } + // Add new future. + if (fut != null) + add(fut); } } @@ -544,7 +537,9 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu mapping.dhtVersion(xidVer, xidVer); - tx.readyNearLocks(mapping, Collections.<GridCacheVersion>emptyList(), Collections.<GridCacheVersion>emptyList(), + tx.readyNearLocks(mapping, + Collections.<GridCacheVersion>emptyList(), + Collections.<GridCacheVersion>emptyList(), Collections.<GridCacheVersion>emptyList()); } }