Performance optimizations.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a4848a70 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a4848a70 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a4848a70 Branch: refs/heads/ignite-638 Commit: a4848a702fea1573af7f36af91d02f7df3ab64f4 Parents: 7393227 Author: sboikov <sboi...@gridgain.com> Authored: Mon Nov 9 12:16:16 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Mon Nov 9 12:16:16 2015 +0300 ---------------------------------------------------------------------- .../managers/communication/GridIoMessage.java | 4 +- .../processors/cache/GridCacheContext.java | 29 +-- .../processors/cache/GridCacheEntryEx.java | 4 +- .../processors/cache/GridCacheMapEntry.java | 55 ++-- .../processors/cache/GridCacheMvccManager.java | 145 +++++------ .../distributed/GridDistributedCacheEntry.java | 2 +- .../distributed/GridDistributedTxMapping.java | 8 +- .../GridDistributedTxRemoteAdapter.java | 5 +- .../distributed/dht/GridDhtLockFuture.java | 7 +- .../dht/GridDhtTransactionalCacheAdapter.java | 13 +- .../distributed/dht/GridDhtTxLocalAdapter.java | 43 ++-- .../distributed/dht/GridDhtTxPrepareFuture.java | 78 +++--- .../cache/distributed/dht/GridDhtTxRemote.java | 45 ++-- .../dht/atomic/GridDhtAtomicCache.java | 1 - .../dht/colocated/GridDhtColocatedCache.java | 7 +- .../near/GridNearOptimisticTxPrepareFuture.java | 11 +- .../near/GridNearTransactionalCache.java | 7 +- .../near/GridNearTxFinishFuture.java | 157 ++++++------ .../cache/distributed/near/GridNearTxLocal.java | 21 +- .../cache/transactions/IgniteInternalTx.java | 3 +- .../cache/transactions/IgniteTxAdapter.java | 251 ++++++++++--------- .../cache/transactions/IgniteTxHandler.java | 37 +-- .../transactions/IgniteTxLocalAdapter.java | 26 +- .../cache/transactions/IgniteTxManager.java | 171 ++++++------- .../GridBoundedConcurrentLinkedHashMap.java | 7 +- .../GridBoundedConcurrentLinkedHashSet.java | 7 +- .../util/GridBoundedConcurrentOrderedMap.java | 39 +-- .../internal/util/GridConcurrentFactory.java | 11 +- .../util/GridConcurrentLinkedHashSet.java | 9 +- .../ignite/internal/util/IgniteUuidCache.java | 6 +- .../util/future/GridCompoundFuture.java | 155 ++++++++---- .../java/org/jsr166/ConcurrentHashMap8.java | 2 +- .../java/org/jsr166/ConcurrentLinkedDeque8.java | 2 +- .../org/jsr166/ConcurrentLinkedHashMap.java | 195 +++++++++++--- .../GridCacheAffinityBackupsSelfTest.java | 8 + .../cache/GridCacheAbstractFullApiSelfTest.java | 2 +- .../GridCacheMissingCommitVersionSelfTest.java | 40 +-- .../processors/cache/GridCacheTestEntryEx.java | 3 +- ...achePartitionedMultiNodeFullApiSelfTest.java | 2 +- .../continuous/GridEventConsumeSelfTest.java | 2 +- ...dBoundedConcurrentLinkedHashMapSelfTest.java | 2 +- .../GridConcurrentLinkedHashMapSelfTest.java | 62 ++++- .../junits/common/GridCommonAbstractTest.java | 4 +- ...rrentLinkedHashMapMultiThreadedSelfTest.java | 104 ++++---- .../yardstick/cache/IgnitePutTxBenchmark.java | 10 + .../cache/IgnitePutTxPrimaryOnlyBenchmark.java | 65 +++++ .../IgnitePutTxSkipLocalBackupBenchmark.java | 65 +++++ .../cache/WaitMapExchangeFinishCallable.java | 95 +++++++ 48 files changed, 1220 insertions(+), 807 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java index c83a281..cb19ba0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java @@ -143,7 +143,7 @@ public class GridIoMessage implements Message { /** * @return Message. */ - public Object message() { + public Message message() { return msg; } @@ -320,4 +320,4 @@ public class GridIoMessage implements Message { @Override public String toString() { return S.toString(GridIoMessage.class, this); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 1f4852c..ee4da46 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -1488,10 +1488,9 @@ public class GridCacheContext<K, V> implements Externalizable { * @param log Log. * @param dhtMap Dht mappings. * @param nearMap Near mappings. - * @return {@code True} if mapped. * @throws GridCacheEntryRemovedException If reader for entry is removed. */ - public boolean dhtMap( + public void dhtMap( UUID nearNodeId, AffinityTopologyVersion topVer, GridDhtCacheEntry entry, @@ -1509,7 +1508,7 @@ public class GridCacheContext<K, V> implements Externalizable { Collection<ClusterNode> dhtRemoteNodes = F.view(dhtNodes, F.remoteNodes(nodeId())); // Exclude local node. - boolean ret = map(entry, dhtRemoteNodes, dhtMap); + map(entry, dhtRemoteNodes, dhtMap); Collection<ClusterNode> nearRemoteNodes = null; @@ -1530,7 +1529,7 @@ public class GridCacheContext<K, V> implements Externalizable { if (nearNodes != null && !nearNodes.isEmpty()) { nearRemoteNodes = F.view(nearNodes, F.notIn(dhtNodes)); - ret |= map(entry, nearRemoteNodes, nearMap); + map(entry, nearRemoteNodes, nearMap); } } @@ -1540,8 +1539,6 @@ public class GridCacheContext<K, V> implements Externalizable { entry.mappings(explicitLockVer, dhtNodeIds, nearNodeIds); } - - return ret; } /** @@ -1549,10 +1546,9 @@ public class GridCacheContext<K, V> implements Externalizable { * @param log Log. * @param dhtMap Dht mappings. * @param nearMap Near mappings. - * @return {@code True} if mapped. * @throws GridCacheEntryRemovedException If reader for entry is removed. */ - public boolean dhtMap( + public void dhtMap( GridDhtCacheEntry entry, GridCacheVersion explicitLockVer, IgniteLogger log, @@ -1571,27 +1567,20 @@ public class GridCacheContext<K, V> implements Externalizable { Collection<ClusterNode> nearNodes = cand.mappedNearNodes(); - boolean ret = map(entry, dhtNodes, dhtMap); + map(entry, dhtNodes, dhtMap); if (nearNodes != null && !nearNodes.isEmpty()) - ret |= map(entry, nearNodes, nearMap); - - return ret; + map(entry, nearNodes, nearMap); } - - return false; } /** * @param entry Entry. * @param nodes Nodes. * @param map Map. - * @return {@code True} if mapped. */ - private boolean map(GridDhtCacheEntry entry, Iterable<ClusterNode> nodes, + private void map(GridDhtCacheEntry entry, Iterable<ClusterNode> nodes, Map<ClusterNode, List<GridDhtCacheEntry>> map) { - boolean ret = false; - if (nodes != null) { for (ClusterNode n : nodes) { List<GridDhtCacheEntry> entries = map.get(n); @@ -1600,12 +1589,8 @@ public class GridCacheContext<K, V> implements Externalizable { map.put(n, entries = new LinkedList<>()); entries.add(entry); - - ret = true; } } - - return ret; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index 50b01c8..af62e39 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -389,7 +389,6 @@ public interface GridCacheEntryEx { * @param tx Cache transaction. * @param evtNodeId ID of node responsible for this change. * @param affNodeId Partitioned node iD. - * @param writeThrough If {@code true}, persist to the storage. * @param retval {@code True} if value should be returned (and unmarshalled if needed). * @param evt Flag to signal event notification. * @param metrics Flag to signal metrics notification. @@ -409,7 +408,6 @@ public interface GridCacheEntryEx { @Nullable IgniteInternalTx tx, UUID evtNodeId, UUID affNodeId, - boolean writeThrough, boolean retval, boolean evt, boolean metrics, @@ -1014,4 +1012,4 @@ public interface GridCacheEntryEx { * Calls {@link GridDhtLocalPartition#onUnlock()} for this entry's partition. */ public void onUnlock(); -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index ca0995a..df9f5c4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1108,7 +1108,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme CacheLazyEntry e = new CacheLazyEntry(cctx, key, old); - Object interceptorVal = cctx.config().getInterceptor().onBeforePut(new CacheLazyEntry(cctx, key, old), + Object interceptorVal = cctx.config().getInterceptor().onBeforePut( + new CacheLazyEntry(cctx, key, old), val0); key0 = e.key(); @@ -1212,7 +1213,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme @Nullable IgniteInternalTx tx, UUID evtNodeId, UUID affNodeId, - boolean writeThrough, boolean retval, boolean evt, boolean metrics, @@ -1244,6 +1244,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme Cache.Entry entry0 = null; + boolean deferred; + + boolean marked = false; + synchronized (this) { checkObsolete(); @@ -1349,40 +1353,33 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme cctx.continuousQueries().onEntryUpdated(this, key, null, old, false); cctx.dataStructures().onEntryUpdated(key, true); - } - - // Persist outside of synchronization. The correctness of the - // value will be handled by current transaction. - if (writeThrough) - cctx.store().remove(tx, keyValue(false)); - if (cctx.deferredDelete() && !detached() && !isInternal()) - cctx.onDeferredDelete(this, newVer); - else { - boolean marked = false; + deferred = cctx.deferredDelete() && !detached() && !isInternal(); - synchronized (this) { + if (!deferred) { // If entry is still removed. - if (newVer == ver) { - if (obsoleteVer == null || !(marked = markObsolete0(obsoleteVer, true, null))) { - if (log.isDebugEnabled()) - log.debug("Entry could not be marked obsolete (it is still used): " + this); - } - else { - recordNodeId(affNodeId, topVer); + assert newVer == ver; - // If entry was not marked obsolete, then removed lock - // will be registered whenever removeLock is called. - cctx.mvcc().addRemoved(cctx, obsoleteVer); + if (obsoleteVer == null || !(marked = markObsolete0(obsoleteVer, true, null))) { + if (log.isDebugEnabled()) + log.debug("Entry could not be marked obsolete (it is still used): " + this); + } + else { + recordNodeId(affNodeId, topVer); - if (log.isDebugEnabled()) - log.debug("Entry was marked obsolete: " + this); - } + if (log.isDebugEnabled()) + log.debug("Entry was marked obsolete: " + this); } } + } - if (marked) - onMarkedObsolete(); + if (deferred) + cctx.onDeferredDelete(this, newVer); + + if (marked) { + assert !deferred; + + onMarkedObsolete(); } if (intercept) @@ -4247,4 +4244,4 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme return "IteratorEntry [key=" + key + ']'; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index 0960c9d..2c14209 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -24,7 +25,6 @@ import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.Map; -import java.util.Queue; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; @@ -57,6 +57,7 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.P1; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteUuid; @@ -88,7 +89,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { private ConcurrentMap<Long, GridCacheExplicitLockSpan> pendingExplicit; /** Set of removed lock versions. */ - private Collection<GridCacheVersion> rmvLocks = + private GridBoundedConcurrentLinkedHashSet<GridCacheVersion> rmvLocks = new GridBoundedConcurrentLinkedHashSet<>(MAX_REMOVED_LOCKS, MAX_REMOVED_LOCKS, 0.75f, 16, PER_SEGMENT_Q); /** Current local candidates. */ @@ -114,7 +115,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { private final ConcurrentMap<GridCacheVersion, GridCacheVersion> near2dht = newMap(); /** Finish futures. */ - private final Queue<FinishLockFuture> finishFuts = new ConcurrentLinkedDeque8<>(); + private final ConcurrentLinkedDeque8<FinishLockFuture> finishFuts = new ConcurrentLinkedDeque8<>(); /** Logger. */ @SuppressWarnings( {"FieldAccessedSynchronizedAndUnsynchronized"}) @@ -143,17 +144,18 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { Collection<? extends GridCacheFuture> futCol = futs.get(owner.version()); if (futCol != null) { - for (GridCacheFuture fut : futCol) { - if (fut instanceof GridCacheMvccFuture && !fut.isDone()) { - GridCacheMvccFuture<Boolean> mvccFut = - (GridCacheMvccFuture<Boolean>)fut; - - // Since this method is called outside of entry synchronization, - // we can safely invoke any method on the future. - // Also note that we don't remove future here if it is done. - // The removal is initiated from within future itself. - if (mvccFut.onOwnerChanged(entry, owner)) - return; + synchronized (futCol) { + for (GridCacheFuture fut : futCol) { + if (fut instanceof GridCacheMvccFuture && !fut.isDone()) { + GridCacheMvccFuture<Boolean> mvccFut = (GridCacheMvccFuture<Boolean>)fut; + + // Since this method is called outside of entry synchronization, + // we can safely invoke any method on the future. + // Also note that we don't remove future here if it is done. + // The removal is initiated from within future itself. + if (mvccFut.onOwnerChanged(entry, owner)) + return; + } } } } @@ -171,8 +173,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { else if (log.isDebugEnabled()) log.debug("Failed to find transaction for changed owner: " + owner); - for (FinishLockFuture f : finishFuts) - f.recheck(entry); + if (!finishFuts.isEmptyx()) { + for (FinishLockFuture f : finishFuts) + f.recheck(entry); + } } /** {@inheritDoc} */ @@ -203,21 +207,8 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { if (log.isDebugEnabled()) log.debug("Processing node left [nodeId=" + discoEvt.eventNode().id() + "]"); - for (Collection<GridCacheFuture<?>> futsCol : futs.values()) { - for (GridCacheFuture<?> fut : futsCol) { - if (!fut.trackable()) { - if (log.isDebugEnabled()) - log.debug("Skipping non-trackable future: " + fut); - - continue; - } - - fut.onNodeLeft(discoEvt.eventNode().id()); - - if (fut.isCancelled() || fut.isDone()) - removeFuture(fut); - } - } + for (GridCacheFuture<?> fut : activeFutures()) + fut.onNodeLeft(discoEvt.eventNode().id()); for (IgniteInternalFuture<?> fut : atomicFuts.values()) { if (fut instanceof GridCacheFuture) { @@ -272,7 +263,15 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { * @return Collection of active futures. */ public Collection<GridCacheFuture<?>> activeFutures() { - return F.flatCollections(futs.values()); + ArrayList<GridCacheFuture<?>> col = new ArrayList<>(); + + for (Collection<GridCacheFuture<?>> verFuts : futs.values()) { + synchronized (verFuts) { + col.addAll(verFuts); + } + } + + return col; } /** @@ -345,10 +344,8 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { * @param err Error. */ private void cancelClientFutures(IgniteCheckedException err) { - for (Collection<GridCacheFuture<?>> futures : futs.values()) { - for (GridCacheFuture<?> future : futures) - ((GridFutureAdapter)future).onDone(err); - } + for (GridCacheFuture<?> fut : activeFutures()) + ((GridFutureAdapter)fut).onDone(err); for (GridCacheAtomicFuture<?> future : atomicFuts.values()) ((GridFutureAdapter)future).onDone(err); @@ -444,11 +441,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { return true; while (true) { - Collection<GridCacheFuture<?>> old = futs.putIfAbsent(fut.version(), - new ConcurrentLinkedDeque8<GridCacheFuture<?>>() { - /** */ - private int hash; + Collection<GridCacheFuture<?>> old = futs.get(fut.version()); + if (old == null) { + Collection<GridCacheFuture<?>> col = new HashSet<GridCacheFuture<?>>(U.capacity(4), 0.75f) { { // Make sure that we add future to queue before // adding queue to the map of futures. @@ -456,16 +452,16 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { } @Override public int hashCode() { - if (hash == 0) - hash = System.identityHashCode(this); - - return hash; + return System.identityHashCode(this); } @Override public boolean equals(Object obj) { return obj == this; } - }); + }; + + old = futs.putIfAbsent(fut.version(), col); + } if (old != null) { boolean empty, dup = false; @@ -474,10 +470,7 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { empty = old.isEmpty(); if (!empty) - dup = old.contains(fut); - - if (!empty && !dup) - old.add(fut); + dup = !old.add(fut); } // Future is being removed, so we force-remove here and try again. @@ -594,14 +587,18 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { @Nullable public GridCacheFuture future(GridCacheVersion ver, IgniteUuid futId) { Collection<? extends GridCacheFuture> futs = this.futs.get(ver); - if (futs != null) - for (GridCacheFuture<?> fut : futs) - if (fut.futureId().equals(futId)) { - if (log.isDebugEnabled()) - log.debug("Found future in futures map: " + fut); + if (futs != null) { + synchronized (futs) { + for (GridCacheFuture<?> fut : futs) { + if (fut.futureId().equals(futId)) { + if (log.isDebugEnabled()) + log.debug("Found future in futures map: " + fut); - return fut; + return fut; + } } + } + } if (log.isDebugEnabled()) log.debug("Failed to find future in futures map [ver=" + ver + ", futId=" + futId + ']'); @@ -619,7 +616,13 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { public <T> Collection<? extends IgniteInternalFuture<T>> futures(GridCacheVersion ver) { Collection c = futs.get(ver); - return c == null ? Collections.<IgniteInternalFuture<T>>emptyList() : (Collection<IgniteInternalFuture<T>>)c; + if (c == null) + return Collections.<IgniteInternalFuture<T>>emptyList(); + else { + synchronized (c) { + return new ArrayList<>((Collection<IgniteInternalFuture<T>>)c); + } + } } /** @@ -949,12 +952,12 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { @Override public void printMemoryStats() { X.println(">>> "); X.println(">>> Mvcc manager memory stats [grid=" + cctx.gridName() + ']'); - X.println(">>> rmvLocksSize: " + rmvLocks.size()); + X.println(">>> rmvLocksSize: " + rmvLocks.sizex()); X.println(">>> dhtLocCandsSize: " + dhtLocCands.size()); X.println(">>> lockedSize: " + locked.size()); X.println(">>> futsSize: " + futs.size()); X.println(">>> near2dhtSize: " + near2dht.size()); - X.println(">>> finishFutsSize: " + finishFuts.size()); + X.println(">>> finishFutsSize: " + finishFuts.sizex()); } /** @@ -974,9 +977,11 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { public Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> unfinishedLocks(AffinityTopologyVersion topVer) { Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> cands = new HashMap<>(); - for (FinishLockFuture fut : finishFuts) { - if (fut.topologyVersion().equals(topVer)) - cands.putAll(fut.pendingLocks()); + if (!finishFuts.isEmptyx()) { + for (FinishLockFuture fut : finishFuts) { + if (fut.topologyVersion().equals(topVer)) + cands.putAll(fut.pendingLocks()); + } } return cands; @@ -1054,8 +1059,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { * @param topVer Topology version. * @return Future that signals when all locks for given partitions will be released. */ - private IgniteInternalFuture<?> finishLocks(@Nullable final IgnitePredicate<GridDistributedCacheEntry> filter, - AffinityTopologyVersion topVer) { + private IgniteInternalFuture<?> finishLocks( + @Nullable final IgnitePredicate<GridDistributedCacheEntry> filter, + AffinityTopologyVersion topVer + ) { assert topVer.topologyVersion() != 0; if (topVer.equals(AffinityTopologyVersion.NONE)) @@ -1069,10 +1076,6 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { finishFut.listen(new CI1<IgniteInternalFuture<?>>() { @Override public void apply(IgniteInternalFuture<?> e) { finishFuts.remove(finishFut); - - // This call is required to make sure that the concurrent queue - // clears memory occupied by internal nodes. - finishFuts.peek(); } }); @@ -1088,8 +1091,10 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { if (exchLog.isDebugEnabled()) exchLog.debug("Rechecking pending locks for completion."); - for (FinishLockFuture fut : finishFuts) - fut.recheck(); + if (!finishFuts.isEmptyx()) { + for (FinishLockFuture fut : finishFuts) + fut.recheck(); + } } /** @@ -1250,4 +1255,4 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { return S.toString(FinishLockFuture.class, this, super.toString()); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java index a138d30..a3eb723 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java @@ -403,7 +403,7 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry { doomed = mvcc == null ? null : mvcc.candidate(ver); - if (doomed == null || doomed.dhtLocal() || (!doomed.local() && !doomed.nearLocal())) + if (doomed == null) addRemoved(ver); GridCacheVersion obsoleteVer = obsoleteVersionExtras(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java index 1e78ba2..2d2d935 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxMapping.java @@ -23,13 +23,13 @@ import java.io.ObjectInput; import java.io.ObjectOutput; import java.util.Collection; import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.UUID; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.util.GridConcurrentLinkedHashSet; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.F; @@ -87,7 +87,7 @@ public class GridDistributedTxMapping implements Externalizable { public GridDistributedTxMapping(ClusterNode node) { this.node = node; - entries = new GridConcurrentLinkedHashSet<>(); + entries = new LinkedHashSet<>(); } /** @@ -297,7 +297,7 @@ public class GridDistributedTxMapping implements Externalizable { */ private void ensureModifiable() { if (readOnly) { - entries = new GridConcurrentLinkedHashSet<>(entries); + entries = new LinkedHashSet<>(entries); readOnly = false; } @@ -330,4 +330,4 @@ public class GridDistributedTxMapping implements Externalizable { @Override public String toString() { return S.toString(GridDistributedTxMapping.class, this, "node", node.id()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index fcbf58d..93303c8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -583,13 +583,13 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter eventNodeId(), nodeId, false, - false, true, true, topVer, null, replicate ? DR_BACKUP : DR_NONE, - near() ? null : explicitVer, CU.subjectId(this, cctx), + near() ? null : explicitVer, + CU.subjectId(this, cctx), resolveTaskName(), dhtVer); else { @@ -629,7 +629,6 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter eventNodeId(), nodeId, false, - false, true, true, topVer, http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index c175b0b..579d701 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -782,14 +782,12 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> if (log.isDebugEnabled()) log.debug("Mapping entry for DHT lock future: " + this); - boolean hasRmtNodes = false; - // Assign keys to primary nodes. for (GridDhtCacheEntry entry : entries) { try { while (true) { try { - hasRmtNodes = cctx.dhtMap( + cctx.dhtMap( nearNodeId, topVer, entry, @@ -823,9 +821,6 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> } } - if (tx != null) - tx.needsCompletedVersions(hasRmtNodes); - if (isDone()) { if (log.isDebugEnabled()) log.debug("Mapping won't proceed because future is done: " + this); http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index 4ce4759..3069afd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -62,9 +62,9 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.GridLeanSet; -import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.lang.GridClosureException; +import org.apache.ignite.internal.util.lang.IgnitePair; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.C2; import org.apache.ignite.internal.util.typedef.CI1; @@ -1090,8 +1090,9 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach // We have to add completed versions for cases when nearLocal and remote transactions // execute concurrently. - res.completedVersions(ctx.tm().committedVersions(req.version()), - ctx.tm().rolledbackVersions(req.version())); + IgnitePair<Collection<GridCacheVersion>> versPair = ctx.tm().versions(req.version()); + + res.completedVersions(versPair.get1(), versPair.get2()); int i = 0; @@ -1510,8 +1511,10 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach } } - Collection<GridCacheVersion> committed = ctx.tm().committedVersions(ver); - Collection<GridCacheVersion> rolledback = ctx.tm().rolledbackVersions(ver); + IgnitePair<Collection<GridCacheVersion>> versPair = ctx.tm().versions(ver); + + Collection<GridCacheVersion> committed = versPair.get1(); + Collection<GridCacheVersion> rolledback = versPair.get2(); // Backups. for (Map.Entry<ClusterNode, List<KeyCacheObject>> entry : dhtMap.entrySet()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 8c7d985..6de8795 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; @@ -84,7 +83,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { protected Map<UUID, GridDistributedTxMapping> dhtMap = new ConcurrentHashMap8<>(); /** Mapped flag. */ - protected AtomicBoolean mapped = new AtomicBoolean(); + protected volatile boolean mapped; /** */ private long dhtThreadId; @@ -92,9 +91,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { /** */ protected boolean explicitLock; - /** */ - private boolean needsCompletedVers; - /** Versions of pending locks for entries of this tx. */ private Collection<GridCacheVersion> pendingVers; @@ -141,20 +137,20 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { int taskNameHash ) { super( - cctx, - xidVer, - implicit, - implicitSingle, - sys, - plc, - concurrency, - isolation, - timeout, + cctx, + xidVer, + implicit, + implicitSingle, + sys, + plc, + concurrency, + isolation, + timeout, invalidate, storeEnabled, onePhaseCommit, - txSize, - subjId, + txSize, + subjId, taskNameHash ); @@ -244,16 +240,9 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { */ protected abstract void sendFinishReply(boolean commit, @Nullable Throwable err); - /** - * @param needsCompletedVers {@code True} if needs completed versions. - */ - public void needsCompletedVersions(boolean needsCompletedVers) { - this.needsCompletedVers |= needsCompletedVers; - } - /** {@inheritDoc} */ @Override public boolean needsCompletedVersions() { - return needsCompletedVers; + return nearOnOriginatingNode; } /** @@ -281,10 +270,10 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { * Map explicit locks. */ protected void mapExplicitLocks() { - if (!mapped.get()) { + if (!mapped) { // Explicit locks may participate in implicit transactions only. if (!implicit()) { - mapped.set(true); + mapped = true; return; } @@ -343,7 +332,7 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { if (!F.isEmpty(nearEntryMap)) addNearNodeEntryMapping(nearEntryMap); - mapped.set(true); + mapped = true; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 61975d7..1d6f633 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -20,9 +20,11 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -58,10 +60,10 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.dr.GridDrType; import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException; import org.apache.ignite.internal.util.F0; -import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.GridLeanSet; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.lang.IgnitePair; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.C1; @@ -177,7 +179,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter /** Keys that should be locked. */ @GridToStringInclude - private GridConcurrentHashSet<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>(); + private final Set<IgniteTxKey> lockKeys = new HashSet<>(); /** Force keys future for correct transforms. */ private IgniteInternalFuture<?> forceKeysFut; @@ -267,7 +269,11 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter if (log.isDebugEnabled()) log.debug("Transaction future received owner changed callback: " + entry); - boolean rmv = lockKeys.remove(entry.txKey()); + boolean rmv; + + synchronized (lockKeys) { + rmv = lockKeys.remove(entry.txKey()); + } return rmv && mapIfLocked(); } @@ -293,7 +299,12 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter * @return {@code True} if all locks are owned. */ private boolean checkLocks() { - return locksReady && lockKeys.isEmpty(); + if (!locksReady) + return false; + + synchronized (lockKeys) { + return lockKeys.isEmpty(); + } } /** {@inheritDoc} */ @@ -495,8 +506,11 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter txEntry.cached(entry); } - if (tx.optimistic() && txEntry.explicitVersion() == null) - lockKeys.add(txEntry.txKey()); + if (tx.optimistic() && txEntry.explicitVersion() == null) { + synchronized (lockKeys) { + lockKeys.add(txEntry.txKey()); + } + } while (true) { try { @@ -689,7 +703,11 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter GridCacheVersion min = tx.minVersion(); - res.completedVersions(cctx.tm().committedVersions(min), cctx.tm().rolledbackVersions(min)); + if (tx.needsCompletedVersions()) { + IgnitePair<Collection<GridCacheVersion>> versPair = cctx.tm().versions(min); + + res.completedVersions(versPair.get1(), versPair.get2()); + } res.pending(localDhtPendingVersions(tx.writeEntries(), min)); @@ -987,21 +1005,11 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter if (err0 != null) { err.compareAndSet(null, err0); + tx.rollbackAsync(); + final GridNearTxPrepareResponse res = createPrepareResponse(err.get()); - tx.rollbackAsync().listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() { - @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) { - if (GridDhtTxPrepareFuture.super.onDone(res, res.error())) { - try { - if (replied.compareAndSet(false, true)) - sendPrepareResponse(res); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to send prepare response for transaction: " + tx, e); - } - } - } - }); + onDone(res, res.error()); return; } @@ -1017,20 +1025,16 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter Map<UUID, GridDistributedTxMapping> futDhtMap = new HashMap<>(); Map<UUID, GridDistributedTxMapping> futNearMap = new HashMap<>(); - boolean hasRemoteNodes = false; - // Assign keys to primary nodes. if (!F.isEmpty(writes)) { for (IgniteTxEntry write : writes) - hasRemoteNodes |= map(tx.entry(write.txKey()), futDhtMap, futNearMap); + map(tx.entry(write.txKey()), futDhtMap, futNearMap); } if (!F.isEmpty(reads)) { for (IgniteTxEntry read : reads) - hasRemoteNodes |= map(tx.entry(read.txKey()), futDhtMap, futNearMap); + map(tx.entry(read.txKey()), futDhtMap, futNearMap); } - - tx.needsCompletedVersions(hasRemoteNodes); } if (isDone()) @@ -1223,15 +1227,14 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter * @param entry Transaction entry. * @param futDhtMap DHT mapping. * @param futNearMap Near mapping. - * @return {@code True} if mapped. */ - private boolean map( + private void map( IgniteTxEntry entry, Map<UUID, GridDistributedTxMapping> futDhtMap, Map<UUID, GridDistributedTxMapping> futNearMap ) { if (entry.cached().isLocal()) - return false; + return; GridDhtCacheEntry cached = (GridDhtCacheEntry)entry.cached(); @@ -1247,8 +1250,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter entry.ttl(CU.toTtl(expiry.getExpiryForAccess())); } - boolean ret; - while (true) { try { Collection<ClusterNode> dhtNodes = dht.topology().nodes(cached.partition(), tx.topologyVersion()); @@ -1272,10 +1273,10 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter log.debug("Entry has no near readers: " + entry); // Exclude local node. - ret = map(entry, F.view(dhtNodes, F.remoteNodes(cctx.localNodeId())), dhtMap, futDhtMap); + map(entry, F.view(dhtNodes, F.remoteNodes(cctx.localNodeId())), dhtMap, futDhtMap); // Exclude DHT nodes. - ret |= map(entry, F.view(nearNodes, F0.notIn(dhtNodes)), nearMap, futNearMap); + map(entry, F.view(nearNodes, F0.notIn(dhtNodes)), nearMap, futNearMap); break; } @@ -1285,8 +1286,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter entry.cached(cached); } } - - return ret; } /** @@ -1294,16 +1293,13 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter * @param nodes Nodes. * @param globalMap Map. * @param locMap Exclude map. - * @return {@code True} if mapped. */ - private boolean map( + private void map( IgniteTxEntry entry, Iterable<ClusterNode> nodes, Map<UUID, GridDistributedTxMapping> globalMap, Map<UUID, GridDistributedTxMapping> locMap ) { - boolean ret = false; - if (nodes != null) { for (ClusterNode n : nodes) { GridDistributedTxMapping global = globalMap.get(n.id()); @@ -1332,12 +1328,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter locMap.put(n.id(), loc = new GridDistributedTxMapping(n)); loc.add(entry); - - ret = true; } } - - return ret; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java index f8be2a7..e268a88 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java @@ -39,6 +39,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringBuilder; import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; @@ -112,19 +113,19 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { int taskNameHash ) { super( - ctx, - nodeId, - rmtThreadId, - xidVer, - commitVer, + ctx, + nodeId, + rmtThreadId, + xidVer, + commitVer, sys, plc, - concurrency, - isolation, - invalidate, - timeout, + concurrency, + isolation, + invalidate, + timeout, txSize, - subjId, + subjId, taskNameHash ); @@ -138,7 +139,7 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { readMap = Collections.emptyMap(); - writeMap = new ConcurrentLinkedHashMap<>(txSize, 1.0f); + writeMap = new ConcurrentLinkedHashMap<>(U.capacity(txSize), 0.75f, 1); topologyVersion(topVer); } @@ -183,19 +184,19 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { int taskNameHash ) { super( - ctx, - nodeId, - rmtThreadId, - xidVer, - commitVer, + ctx, + nodeId, + rmtThreadId, + xidVer, + commitVer, sys, plc, - concurrency, - isolation, - invalidate, - timeout, + concurrency, + isolation, + invalidate, + timeout, txSize, - subjId, + subjId, taskNameHash ); @@ -207,7 +208,7 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { this.rmtFutId = rmtFutId; readMap = Collections.emptyMap(); - writeMap = new ConcurrentLinkedHashMap<>(txSize, 1.0f); + writeMap = new ConcurrentLinkedHashMap<>(U.capacity(txSize), 0.75f, 1); topologyVersion(topVer); } http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 4cd9e84..7f9edb2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -53,7 +53,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; -import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException; import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory; import org.apache.ignite.internal.processors.cache.GridCacheOperation; http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index f03b461..83c220d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -64,6 +64,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.lang.IgnitePair; import org.apache.ignite.internal.util.typedef.C2; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; @@ -688,8 +689,10 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte if (map == null || map.isEmpty()) return; - Collection<GridCacheVersion> committed = ctx.tm().committedVersions(ver); - Collection<GridCacheVersion> rolledback = ctx.tm().rolledbackVersions(ver); + IgnitePair<Collection<GridCacheVersion>> versPair = ctx.tm().versions(ver); + + Collection<GridCacheVersion> committed = versPair.get1(); + Collection<GridCacheVersion> rolledback = versPair.get2(); for (Map.Entry<ClusterNode, GridNearUnlockRequest> mapping : map.entrySet()) { ClusterNode n = mapping.getKey(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index af43113..0002180 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -17,8 +17,10 @@ package org.apache.ignite.internal.processors.cache.distributed.near; +import java.util.ArrayDeque; import java.util.Collection; import java.util.List; +import java.util.Queue; import java.util.UUID; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.ignite.IgniteCheckedException; @@ -55,7 +57,6 @@ import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.transactions.TransactionTimeoutException; import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentLinkedDeque8; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; import static org.apache.ignite.transactions.TransactionState.PREPARED; @@ -293,7 +294,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa txMapping = new GridDhtTxMapping(); - ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings = new ConcurrentLinkedDeque8<>(); + Queue<GridDistributedTxMapping> mappings = new ArrayDeque<>(); if (!F.isEmpty(writes)) { for (int cacheId : tx.activeCacheIds()) { @@ -353,7 +354,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa * * @param mappings Queue of mappings. */ - private void proceedPrepare(final ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings) { + private void proceedPrepare(final Queue<GridDistributedTxMapping> mappings) { if (isDone()) return; @@ -556,7 +557,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa private AtomicBoolean rcvRes = new AtomicBoolean(false); /** Mappings to proceed prepare. */ - private ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings; + private Queue<GridDistributedTxMapping> mappings; /** * @param m Mapping. @@ -564,7 +565,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa */ MiniFuture( GridDistributedTxMapping m, - ConcurrentLinkedDeque8<GridDistributedTxMapping> mappings + Queue<GridDistributedTxMapping> mappings ) { this.m = m; this.mappings = mappings; http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java index 0e8aa0d..5ab85b2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java @@ -48,6 +48,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.lang.IgnitePair; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -712,8 +713,10 @@ public class GridNearTransactionalCache<K, V> extends GridNearCacheAdapter<K, V> if (map == null || map.isEmpty()) return; - Collection<GridCacheVersion> committed = ctx.tm().committedVersions(ver); - Collection<GridCacheVersion> rolledback = ctx.tm().rolledbackVersions(ver); + IgnitePair<Collection<GridCacheVersion>> versPair = ctx.tm().versions(ver); + + Collection<GridCacheVersion> committed = versPair.get1(); + Collection<GridCacheVersion> rolledback = versPair.get2(); for (Map.Entry<ClusterNode, GridNearUnlockRequest> mapping : map.entrySet()) { ClusterNode n = mapping.getKey(); http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index 46c9f3e..a9dbda2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -21,7 +21,6 @@ import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.UUID; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; @@ -88,15 +87,15 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu /** Commit flag. */ private boolean commit; - /** Error. */ - private AtomicReference<Throwable> err = new AtomicReference<>(null); - /** Node mappings. */ - private ConcurrentMap<UUID, GridDistributedTxMapping> mappings; + private Map<UUID, GridDistributedTxMapping> mappings; /** Trackable flag. */ private boolean trackable = true; + /** */ + private boolean finishOnePhaseCalled; + /** * @param cctx Context. * @param tx Transaction. @@ -176,38 +175,6 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu } /** - * @param e Error. - */ - void onError(Throwable e) { - tx.commitError(e); - - if (err.compareAndSet(null, e)) { - boolean marked = tx.setRollbackOnly(); - - if (e instanceof IgniteTxRollbackCheckedException) { - if (marked) { - try { - tx.rollback(); - } - catch (IgniteCheckedException ex) { - U.error(log, "Failed to automatically rollback transaction: " + tx, ex); - } - } - } - else if (tx.implicit() && tx.isSystemInvalidate()) { // Finish implicit transaction on heuristic error. - try { - tx.close(); - } - catch (IgniteCheckedException ex) { - U.error(log, "Failed to invalidate transaction: " + tx, ex); - } - } - - onComplete(); - } - } - - /** * @param nodeId Sender. * @param res Result. */ @@ -247,24 +214,56 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu /** {@inheritDoc} */ @Override public boolean onDone(IgniteInternalTx tx0, Throwable err) { - if ((initialized() || err != null)) { - if (tx.needCheckBackup()) { - assert tx.onePhaseCommit(); + if (isDone()) + return false; - if (err != null) - err = new TransactionRollbackException("Failed to commit transaction.", err); + synchronized (this) { + if (isDone()) + return false; - try { - tx.finish(err == null); + if (err != null) { + tx.commitError(err); + + boolean marked = tx.setRollbackOnly(); + + if (err instanceof IgniteTxRollbackCheckedException) { + if (marked) { + try { + tx.rollback(); + } + catch (IgniteCheckedException ex) { + U.error(log, "Failed to automatically rollback transaction: " + tx, ex); + } + } } - catch (IgniteCheckedException e) { - if (err != null) - err.addSuppressed(e); - else - err = e; + else if (tx.implicit() && tx.isSystemInvalidate()) { // Finish implicit transaction on heuristic error. + try { + tx.close(); + } + catch (IgniteCheckedException ex) { + U.error(log, "Failed to invalidate transaction: " + tx, ex); + } } } + if (initialized() || err != null) { + if (tx.needCheckBackup()) { + assert tx.onePhaseCommit(); + + if (err != null) + err = new TransactionRollbackException("Failed to commit transaction.", err); + + try { + tx.finish(err == null); + } + catch (IgniteCheckedException e) { + if (err != null) + err.addSuppressed(e); + else + err = e; + } + } + if (tx.onePhaseCommit()) { boolean commit = this.commit && err == null; @@ -273,36 +272,35 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu tx.tmFinish(commit); } - Throwable th = this.err.get(); - - if (super.onDone(tx0, th != null ? th : err)) { - if (error() instanceof IgniteTxHeuristicCheckedException) { - AffinityTopologyVersion topVer = tx.topologyVersion(); + if (super.onDone(tx0, err)) { + if (error() instanceof IgniteTxHeuristicCheckedException) { + AffinityTopologyVersion topVer = tx.topologyVersion(); - for (IgniteTxEntry e : tx.writeMap().values()) { - GridCacheContext cacheCtx = e.context(); + for (IgniteTxEntry e : tx.writeMap().values()) { + GridCacheContext cacheCtx = e.context(); - try { - if (e.op() != NOOP && !cacheCtx.affinity().localNode(e.key(), topVer)) { - GridCacheEntryEx entry = cacheCtx.cache().peekEx(e.key()); + try { + if (e.op() != NOOP && !cacheCtx.affinity().localNode(e.key(), topVer)) { + GridCacheEntryEx entry = cacheCtx.cache().peekEx(e.key()); - if (entry != null) - entry.invalidate(null, tx.xidVersion()); + if (entry != null) + entry.invalidate(null, tx.xidVersion()); + } } - } - catch (Throwable t) { - U.error(log, "Failed to invalidate entry.", t); + catch (Throwable t) { + U.error(log, "Failed to invalidate entry.", t); - if (t instanceof Error) - throw (Error)t; + if (t instanceof Error) + throw (Error)t; + } } } - } - // Don't forget to clean up. - cctx.mvcc().removeFuture(this); + // Don't forget to clean up. + cctx.mvcc().removeFuture(this); - return true; + return true; + } } } @@ -321,7 +319,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu * Completeness callback. */ private void onComplete() { - onDone(tx, err.get()); + onDone(tx); } /** @@ -354,7 +352,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu markInitialized(); - if (!isSync()) { + if (!isSync() && !isDone()) { boolean complete = true; for (IgniteInternalFuture<?> f : pending()) @@ -367,15 +365,15 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu } } else - onError(new IgniteCheckedException("Failed to commit transaction: " + CU.txString(tx))); + onDone(new IgniteCheckedException("Failed to commit transaction: " + CU.txString(tx))); } catch (Error | RuntimeException e) { - onError(e); + onDone(e); throw e; } catch (IgniteCheckedException e) { - onError(e); + onDone(e); } } @@ -415,7 +413,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu "(backup has left grid): " + tx.xidVersion(), cause)); } else if (backup.isLocal()) { - boolean committed = cctx.tm().txHandler().checkDhtRemoteTxCommitted(tx.xidVersion()); + boolean committed = !cctx.tm().addRolledbackTx(tx); readyNearMappingFromBackup(mapping); @@ -515,6 +513,13 @@ public final class GridNearTxFinishFuture<K, V> extends GridCompoundIdentityFutu * @param commit Commit flag. */ private void finishOnePhase(boolean commit) { + assert Thread.holdsLock(this); + + if (finishOnePhaseCalled) + return; + + finishOnePhaseCalled = true; + // No need to send messages as transaction was already committed on remote node. // Finish local mapping only as we need send commit message to backups. for (GridDistributedTxMapping m : mappings.values()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 883c285..db4a4b8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -23,7 +23,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.UUID; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReference; import javax.cache.expiry.ExpiryPolicy; import org.apache.ignite.IgniteCheckedException; @@ -88,7 +87,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { private static final long serialVersionUID = 0L; /** DHT mappings. */ - private ConcurrentMap<UUID, GridDistributedTxMapping> mappings = new ConcurrentHashMap8<>(); + private Map<UUID, GridDistributedTxMapping> mappings = new ConcurrentHashMap8<>(); /** Future. */ @GridToStringExclude @@ -217,7 +216,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { /** {@inheritDoc} */ @Override protected IgniteInternalFuture<Boolean> addReader( - long msgId, + long msgId, GridDhtCacheEntry cached, IgniteTxEntry entry, AffinityTopologyVersion topVer @@ -472,7 +471,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { /** * @return DHT map. */ - ConcurrentMap<UUID, GridDistributedTxMapping> mappings() { + Map<UUID, GridDistributedTxMapping> mappings() { return mappings; } @@ -798,14 +797,14 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { catch (Error | RuntimeException e) { commitErr.compareAndSet(null, e); - fut0.onError(e); + fut0.onDone(e); throw e; } catch (IgniteCheckedException e) { commitErr.compareAndSet(null, e); - fut0.onError(e); + fut0.onDone(e); } } }); @@ -1152,8 +1151,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { /** {@inheritDoc} */ @Override protected GridCacheEntryEx entryEx( - GridCacheContext cacheCtx, - IgniteTxKey key, + GridCacheContext cacheCtx, + IgniteTxKey key, AffinityTopologyVersion topVer ) { if (cacheCtx.isColocated()) { @@ -1245,7 +1244,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { @Override public void onRemap(AffinityTopologyVersion topVer) { assert cctx.kernalContext().clientNode(); - mapped.set(false); + mapped = false; nearLocallyMapped = false; colocatedLocallyMapped = false; txNodes = null; @@ -1254,7 +1253,9 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter { dhtMap.clear(); mappings.clear(); - this.topVer.set(topVer); + synchronized (this) { + this.topVer = topVer; + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/a4848a70/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java index 20fb8c2..94af6bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java @@ -536,9 +536,8 @@ public interface IgniteInternalTx extends AutoCloseable, GridTimeoutObject { /** * @param commitVer Commit version. - * @return {@code True} if version was set. */ - public boolean commitVersion(GridCacheVersion commitVer); + public void commitVersion(GridCacheVersion commitVer); /** * @return End version (a.k.a. <tt>'tnc'</tt> or <tt>'transaction number counter'</tt>)