Performance optimizations - reviewed by Yakov.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/175b7f24 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/175b7f24 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/175b7f24 Branch: refs/heads/ignite-single-op-get Commit: 175b7f24e1d62a90e7a7159ad670036216e6d278 Parents: 4c9ea58 Author: Alexey Goncharuk <alexey.goncha...@gmail.com> Authored: Wed Nov 18 19:20:45 2015 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Wed Nov 18 19:20:45 2015 +0300 ---------------------------------------------------------------------- .../ignite/codegen/MessageCodeGenerator.java | 11 +- .../communication/GridIoMessageFactory.java | 6 + .../discovery/GridDiscoveryManager.java | 2 +- .../cache/GridCacheDeploymentManager.java | 2 +- .../processors/cache/GridCacheGateway.java | 1 - .../processors/cache/GridCacheMvcc.java | 7 -- .../processors/cache/GridCacheMvccManager.java | 42 ------- .../GridCachePartitionExchangeManager.java | 55 ++++++++- .../cache/GridCacheSharedContext.java | 7 +- .../distributed/GridCacheTxRecoveryFuture.java | 41 +++++-- .../distributed/GridDistributedBaseMessage.java | 56 --------- .../distributed/GridDistributedLockRequest.java | 6 - .../GridDistributedLockResponse.java | 32 +----- .../GridDistributedTxPrepareRequest.java | 67 +++++++++-- .../distributed/dht/GridDhtLockFuture.java | 63 ++++++---- .../distributed/dht/GridDhtLockRequest.java | 2 +- .../dht/GridDhtTransactionalCacheAdapter.java | 5 +- .../distributed/dht/GridDhtTxLocalAdapter.java | 9 ++ .../distributed/dht/GridDhtTxPrepareFuture.java | 60 ++++++---- .../dht/GridDhtTxPrepareRequest.java | 54 ++++----- .../dht/colocated/GridDhtColocatedCache.java | 2 +- .../colocated/GridDhtColocatedLockFuture.java | 55 ++++++--- .../distributed/near/GridNearLockFuture.java | 56 ++++++--- .../distributed/near/GridNearLockRequest.java | 4 +- ...arOptimisticSerializableTxPrepareFuture.java | 91 +++++++++------ .../near/GridNearOptimisticTxPrepareFuture.java | 50 +++++--- .../GridNearPessimisticTxPrepareFuture.java | 39 +++++-- .../near/GridNearTransactionalCache.java | 7 +- .../near/GridNearTxFinishFuture.java | 18 ++- .../cache/distributed/near/GridNearTxLocal.java | 15 --- .../near/GridNearTxPrepareFutureAdapter.java | 14 ++- .../near/GridNearTxPrepareRequest.java | 52 ++++----- .../cache/transactions/IgniteInternalTx.java | 1 + .../cache/transactions/IgniteTxAdapter.java | 1 + .../cache/transactions/IgniteTxEntry.java | 9 +- .../IgniteTxImplicitSingleStateImpl.java | 7 ++ .../transactions/IgniteTxLocalAdapter.java | 12 +- .../cache/transactions/IgniteTxManager.java | 1 + .../IgniteTxRemoteStateAdapter.java | 5 + .../cache/transactions/IgniteTxState.java | 6 + .../cache/transactions/IgniteTxStateImpl.java | 69 ++++++++--- .../clock/GridClockSyncProcessor.java | 28 ++++- .../internal/util/UUIDCollectionMessage.java | 114 +++++++++++++++++++ .../util/future/GridCompoundFuture.java | 15 ++- .../ignite/internal/util/lang/GridFunc.java | 8 +- .../ignite/internal/util/nio/GridNioServer.java | 13 ++- .../IgniteCacheTxStoreSessionTest.java | 2 +- .../testsuites/IgniteCacheTestSuite3.java | 2 + 48 files changed, 805 insertions(+), 419 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java ---------------------------------------------------------------------- diff --git a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java index 8d9a3f5..74c71c4 100644 --- a/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java +++ b/modules/codegen/src/main/java/org/apache/ignite/codegen/MessageCodeGenerator.java @@ -43,6 +43,10 @@ import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.IgniteCodeGeneratingFail; +import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxPrepareRequest; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareRequest; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest; +import org.apache.ignite.internal.util.UUIDCollectionMessage; import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; @@ -165,7 +169,12 @@ public class MessageCodeGenerator { MessageCodeGenerator gen = new MessageCodeGenerator(srcDir); - gen.generateAll(true); + gen.generateAndWrite(GridDistributedTxPrepareRequest.class); + gen.generateAndWrite(GridDhtTxPrepareRequest.class); + gen.generateAndWrite(GridNearTxPrepareRequest.class); + gen.generateAndWrite(UUIDCollectionMessage.class); + +// gen.generateAll(true); // gen.generateAndWrite(DataStreamerEntry.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index ae8c753..2503eda 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -121,6 +121,7 @@ import org.apache.ignite.internal.processors.rest.handlers.task.GridTaskResultRe import org.apache.ignite.internal.processors.rest.handlers.task.GridTaskResultResponse; import org.apache.ignite.internal.util.GridByteArrayList; import org.apache.ignite.internal.util.GridLongList; +import org.apache.ignite.internal.util.UUIDCollectionMessage; import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.plugin.extensions.communication.MessageFactory; @@ -690,6 +691,11 @@ public class GridIoMessageFactory implements MessageFactory { break; + case 115: + msg = new UUIDCollectionMessage(); + + break; + // [-3..114] - this // [120..123] - DR // [-4..-22] - SQL http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java index cd2f49c..4880338 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java @@ -2136,7 +2136,7 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> { customEvt.node(ctx.discovery().localNode()); customEvt.eventNode(node); customEvt.type(type); - customEvt.topologySnapshot(topVer.topologyVersion(), null); + customEvt.topologySnapshot(topVer.topologyVersion(), evt.get4()); customEvt.affinityTopologyVersion(topVer); customEvt.customMessage(evt.get5()); http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java index 40c5b0f..35e8b75 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheDeploymentManager.java @@ -164,7 +164,7 @@ public class GridCacheDeploymentManager<K, V> extends GridCacheSharedManagerAdap * Callback on method enter. */ public void onEnter() { - if (!locDepOwner && depEnabled && !ignoreOwnership.get() + if (depEnabled && !locDepOwner && !ignoreOwnership.get() && !cctx.kernalContext().job().internal()) { ClassLoader ldr = Thread.currentThread().getContextClassLoader(); http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java index 0eac5ba..1562d70 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheGateway.java @@ -109,7 +109,6 @@ public class GridCacheGateway<K, V> { rwLock.readLock(); return checkState(true, false); - } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java index 12583ad..adcbf92 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java @@ -348,9 +348,6 @@ public final class GridCacheMvcc { reassign(); - if (cand.local()) - cctx.mvcc().removeLocal(cand); - return true; } } @@ -596,8 +593,6 @@ public final class GridCacheMvcc { ); if (serOrder == null) { - cctx.mvcc().addLocal(cand); - boolean add = add0(cand); assert add : cand; @@ -605,8 +600,6 @@ public final class GridCacheMvcc { else { if (!add0(cand)) return null; - - cctx.mvcc().addLocal(cand); } return cand; http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java index 2c14209..8562f37 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvccManager.java @@ -29,7 +29,6 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListSet; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; @@ -92,9 +91,6 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { private GridBoundedConcurrentLinkedHashSet<GridCacheVersion> rmvLocks = new GridBoundedConcurrentLinkedHashSet<>(MAX_REMOVED_LOCKS, MAX_REMOVED_LOCKS, 0.75f, 16, PER_SEGMENT_Q); - /** Current local candidates. */ - private Collection<GridCacheMvccCandidate> dhtLocCands = new ConcurrentSkipListSet<>(); - /** Locked keys. */ @GridToStringExclude private final ConcurrentMap<IgniteTxKey, GridDistributedCacheEntry> locked = newMap(); @@ -707,43 +703,6 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { } /** - * @param cand Local lock. - * @return {@code True} if added. - */ - public boolean addLocal(GridCacheMvccCandidate cand) { - assert cand.key() != null : cand; - assert cand.local() : cand; - - if (cand.dhtLocal() && dhtLocCands.add(cand)) { - if (log.isDebugEnabled()) - log.debug("Added local candidate: " + cand); - - return true; - } - - return false; - } - - /** - * - * @param cand Local candidate to remove. - * @return {@code True} if removed. - */ - public boolean removeLocal(GridCacheMvccCandidate cand) { - assert cand.key() != null : cand; - assert cand.local() : cand; - - if (cand.dhtLocal() && dhtLocCands.remove(cand)) { - if (log.isDebugEnabled()) - log.debug("Removed local candidate: " + cand); - - return true; - } - - return false; - } - - /** * @param cacheCtx Cache context. * @param cand Cache lock candidate to add. * @return {@code True} if added as a result of this operation, @@ -953,7 +912,6 @@ public class GridCacheMvccManager extends GridCacheSharedManagerAdapter { X.println(">>> "); X.println(">>> Mvcc manager memory stats [grid=" + cctx.gridName() + ']'); X.println(">>> rmvLocksSize: " + rmvLocks.sizex()); - X.println(">>> dhtLocCandsSize: " + dhtLocCands.size()); X.println(">>> lockedSize: " + locked.size()); X.println(">>> futsSize: " + futs.size()); X.println(">>> near2dhtSize: " + near2dht.size()); http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 81ff028..e19b310 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -30,6 +30,7 @@ import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -66,6 +67,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.util.GridListSet; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.lang.IgnitePair; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI1; @@ -77,6 +79,7 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.lang.IgniteBiInClosure; +import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; @@ -134,6 +137,10 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana private final ConcurrentMap<AffinityTopologyVersion, AffinityReadyFuture> readyFuts = new ConcurrentHashMap8<>(); /** */ + private final ConcurrentSkipListMap<AffinityTopologyVersion, IgnitePair<IgniteProductVersion>> nodeVers = + new ConcurrentSkipListMap<>(); + + /** */ private final AtomicReference<AffinityTopologyVersion> readyTopVer = new AtomicReference<>(AffinityTopologyVersion.NONE); @@ -572,6 +579,30 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana } /** + * Gets minimum node version for the given topology version. + * + * @param topVer Topology version to get minimum node version for. + * @return Minimum node version. + */ + public IgniteProductVersion minimumNodeVersion(AffinityTopologyVersion topVer) { + IgnitePair<IgniteProductVersion> vers = nodeVers.get(topVer); + + return vers == null ? cctx.localNode().version() : vers.get1(); + } + + /** + * Gets maximum node version for the given topology version. + * + * @param topVer Topology version to get maximum node version for. + * @return Maximum node version. + */ + public IgniteProductVersion maximumNodeVersion(AffinityTopologyVersion topVer) { + IgnitePair<IgniteProductVersion> vers = nodeVers.get(topVer); + + return vers == null ? cctx.localNode().version() : vers.get2(); + } + + /** * @return {@code true} if entered to busy state. */ private boolean enterBusy() { @@ -832,6 +863,28 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana if (log.isDebugEnabled()) log.debug("Exchange done [topVer=" + topVer + ", fut=" + exchFut + ", err=" + err + ']'); + IgniteProductVersion minVer = cctx.localNode().version(); + IgniteProductVersion maxVer = cctx.localNode().version(); + + if (err == null) { + if (!F.isEmpty(exchFut.discoveryEvent().topologyNodes())) { + for (ClusterNode node : exchFut.discoveryEvent().topologyNodes()) { + IgniteProductVersion ver = node.version(); + + if (ver.compareTo(minVer) < 0) + minVer = ver; + + if (ver.compareTo(maxVer) > 0) + maxVer = ver; + } + } + } + + nodeVers.put(topVer, new IgnitePair<>(minVer, maxVer)); + + for (AffinityTopologyVersion oldVer : nodeVers.headMap(new AffinityTopologyVersion(topVer.topologyVersion() - 10, 0)).keySet()) + nodeVers.remove(oldVer); + if (err == null) { while (true) { AffinityTopologyVersion readyVer = readyTopVer.get(); @@ -1050,7 +1103,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana int cnt = 0; - for (GridDhtPartitionsExchangeFuture fut : exchFuts) { + for (GridDhtPartitionsExchangeFuture fut : exchFuts.values()) { U.warn(log, ">>> " + fut); if (++cnt == 10) http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index 4293b90..608829a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -44,6 +44,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; import org.apache.ignite.internal.processors.cache.transactions.TransactionMetricsAdapter; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionManager; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; +import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -533,7 +534,7 @@ public class GridCacheSharedContext<K, V> { * @param cacheCtx Cache context. * @return Error message if transactions are incompatible. */ - @Nullable public String verifyTxCompatibility(IgniteInternalTx tx, Iterable<Integer> activeCacheIds, + @Nullable public String verifyTxCompatibility(IgniteInternalTx tx, GridLongList activeCacheIds, GridCacheContext<K, V> cacheCtx) { if (cacheCtx.systemTx() && !tx.system()) return "system cache can be enlisted only in system transaction"; @@ -541,7 +542,9 @@ public class GridCacheSharedContext<K, V> { if (!cacheCtx.systemTx() && tx.system()) return "non-system cache can't be enlisted in system transaction"; - for (Integer cacheId : activeCacheIds) { + for (int i = 0; i < activeCacheIds.size(); i++) { + int cacheId = (int)activeCacheIds.get(i); + GridCacheContext<K, V> activeCacheCtx = cacheContext(cacheId); if (cacheCtx.systemTx()) { http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java index b266c4d..01c4867 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java @@ -342,20 +342,45 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea */ public void onResult(UUID nodeId, GridCacheTxRecoveryResponse res) { if (!isDone()) { - for (IgniteInternalFuture<Boolean> fut : pending()) { - if (isMini(fut)) { - MiniFuture f = (MiniFuture)fut; + MiniFuture mini = miniFuture(res.miniId()); - if (f.futureId().equals(res.miniId())) { - assert f.nodeId().equals(nodeId); + if (mini != null) { + assert mini.nodeId().equals(nodeId); - f.onResult(res); + mini.onResult(res); + } + } + } - break; - } + /** + * Finds pending mini future by the given mini ID. + * + * @param miniId Mini ID to find. + * @return Mini future. + */ + @SuppressWarnings("ForLoopReplaceableByForEach") + private MiniFuture miniFuture(IgniteUuid miniId) { + // We iterate directly over the futs collection here to avoid copy. + synchronized (futs) { + // Avoid iterator creation. + for (int i = 0; i < futs.size(); i++) { + IgniteInternalFuture<Boolean> fut = futs.get(i); + + if (!isMini(fut)) + continue; + + MiniFuture mini = (MiniFuture)fut; + + if (mini.futureId().equals(miniId)) { + if (!mini.isDone()) + return mini; + else + return null; } } } + + return null; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java index f4a16dc..ebbc9ae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedBaseMessage.java @@ -21,13 +21,10 @@ import java.io.Externalizable; import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; -import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; import org.apache.ignite.internal.processors.cache.GridCacheMessage; -import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; -import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionable; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -49,15 +46,6 @@ public abstract class GridDistributedBaseMessage extends GridCacheMessage implem @GridToStringInclude protected GridCacheVersion ver; - /** - * Candidates for every key ordered in the order of keys. These - * can be either local-only candidates in case of lock acquisition, - * or pending candidates in case of transaction commit. - */ - @GridToStringInclude - @GridDirectTransient - private Collection<GridCacheMvccCandidate>[] candsByIdx; - /** */ @GridToStringExclude private byte[] candsByIdxBytes; @@ -108,23 +96,6 @@ public abstract class GridDistributedBaseMessage extends GridCacheMessage implem this.ver = ver; } - /** {@inheritDoc} - * @param ctx*/ - @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { - super.prepareMarshal(ctx); - - if (candsByIdx != null) - candsByIdxBytes = ctx.marshaller().marshal(candsByIdx); - } - - /** {@inheritDoc} */ - @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { - super.finishUnmarshal(ctx, ldr); - - if (candsByIdxBytes != null) - candsByIdx = ctx.marshaller().unmarshal(candsByIdxBytes, ldr); - } - /** {@inheritDoc} */ @Override public boolean addDeploymentInfo() { return addDepInfo; @@ -169,33 +140,6 @@ public abstract class GridDistributedBaseMessage extends GridCacheMessage implem } /** - * @param idx Key index. - * @param candsByIdx List of candidates for that key. - */ - @SuppressWarnings({"unchecked"}) - public void candidatesByIndex(int idx, Collection<GridCacheMvccCandidate> candsByIdx) { - assert idx < cnt; - - // If nothing to add. - if (candsByIdx == null || candsByIdx.isEmpty()) - return; - - if (this.candsByIdx == null) - this.candsByIdx = new Collection[cnt]; - - this.candsByIdx[idx] = candsByIdx; - } - - /** - * @param idx Key index. - * @return Candidates for given key. - */ - public Collection<GridCacheMvccCandidate> candidatesByIndex(int idx) { - return candsByIdx == null || - candsByIdx[idx] == null ? Collections.<GridCacheMvccCandidate>emptyList() : candsByIdx[idx]; - } - - /** * @return Count of keys referenced in candidates array (needed only locally for optimization). */ public int keysCount() { http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java index 2899e25..b584f8a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java @@ -19,14 +19,12 @@ package org.apache.ignite.internal.processors.cache.distributed; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -261,14 +259,12 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { * * @param key Key. * @param retVal Flag indicating whether value should be returned. - * @param cands Candidates. * @param ctx Context. * @throws IgniteCheckedException If failed. */ public void addKeyBytes( KeyCacheObject key, boolean retVal, - @Nullable Collection<GridCacheMvccCandidate> cands, GridCacheContext ctx ) throws IgniteCheckedException { if (keys == null) @@ -276,8 +272,6 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage { keys.add(key); - candidatesByIndex(idx, cands); - retVals[idx] = retVal; idx++; http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java index cdd58b5..bb3f9ff 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockResponse.java @@ -26,7 +26,6 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.cache.CacheObject; -import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -156,34 +155,11 @@ public class GridDistributedLockResponse extends GridDistributedBaseMessage { } /** - * @param idx Index of locked flag. - * @return Value of locked flag at given index. - */ - public boolean isCurrentlyLocked(int idx) { - assert idx >= 0; - - Collection<GridCacheMvccCandidate> cands = candidatesByIndex(idx); - - for (GridCacheMvccCandidate cand : cands) - if (cand.owner()) - return true; - - return false; - } - - /** - * @param idx Candidates index. - * @param cands Collection of candidates. * @param committedVers Committed versions relative to lock version. * @param rolledbackVers Rolled back versions relative to lock version. */ - public void setCandidates(int idx, Collection<GridCacheMvccCandidate> cands, - Collection<GridCacheVersion> committedVers, Collection<GridCacheVersion> rolledbackVers) { - assert idx >= 0; - + public void setCandidates(Collection<GridCacheVersion> committedVers, Collection<GridCacheVersion> rolledbackVers) { completedVersions(committedVers, rolledbackVers); - - candidatesByIndex(idx, cands); } /** @@ -218,9 +194,6 @@ public class GridDistributedLockResponse extends GridDistributedBaseMessage { prepareMarshalCacheObjects(vals, ctx.cacheContext(cacheId)); -// if (F.isEmpty(valBytes) && !F.isEmpty(vals)) -// valBytes = marshalValuesCollection(vals, ctx); - if (err != null) errBytes = ctx.marshaller().marshal(err); } @@ -231,9 +204,6 @@ public class GridDistributedLockResponse extends GridDistributedBaseMessage { finishUnmarshalCacheObjects(vals, ctx.cacheContext(cacheId), ldr); -// if (F.isEmpty(vals) && !F.isEmpty(valBytes)) -// vals = unmarshalValueBytesCollection(valBytes, ctx, ldr); - if (errBytes != null) err = ctx.marshaller().unmarshal(errBytes, ldr); } http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java index 533c8ca..95176ff 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxPrepareRequest.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridDirectCollection; +import org.apache.ignite.internal.GridDirectMap; import org.apache.ignite.internal.GridDirectTransient; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; @@ -34,9 +35,13 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.UUIDCollectionMessage; import org.apache.ignite.internal.util.tostring.GridToStringBuilder; import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.C1; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; import org.apache.ignite.plugin.extensions.communication.MessageReader; import org.apache.ignite.plugin.extensions.communication.MessageWriter; @@ -52,6 +57,23 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage /** */ private static final long serialVersionUID = 0L; + /** Version in which direct marshalling of tx nodes was introduced. */ + public static final IgniteProductVersion TX_NODES_DIRECT_MARSHALLABLE_SINCE = IgniteProductVersion.fromString("1.5.0"); + + /** Collection to message converter. */ + public static final C1<Collection<UUID>, UUIDCollectionMessage> COL_TO_MSG = new C1<Collection<UUID>, UUIDCollectionMessage>() { + @Override public UUIDCollectionMessage apply(Collection<UUID> uuids) { + return new UUIDCollectionMessage(uuids); + } + }; + + /** Message to collection converter. */ + public static final C1<UUIDCollectionMessage, Collection<UUID>> MSG_TO_COL = new C1<UUIDCollectionMessage, Collection<UUID>>() { + @Override public Collection<UUID> apply(UUIDCollectionMessage msg) { + return msg.uuids(); + } + }; + /** Thread ID. */ @GridToStringInclude private long threadId; @@ -106,6 +128,10 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage @GridDirectTransient private Map<UUID, Collection<UUID>> txNodes; + /** Tx nodes direct marshallable message. */ + @GridDirectMap(keyType = UUID.class, valueType = UUIDCollectionMessage.class) + private Map<UUID, UUIDCollectionMessage> txNodesMsg; + /** */ private byte[] txNodesBytes; @@ -302,8 +328,16 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage dhtVerVals = dhtVers.values(); } - if (txNodes != null) - txNodesBytes = ctx.marshaller().marshal(txNodes); + // Marshal txNodes only if there is a node in topology with an older version. + if (ctx.exchange().minimumNodeVersion(topologyVersion()) + .compareTo(TX_NODES_DIRECT_MARSHALLABLE_SINCE) < 0) { + if (txNodes != null && txNodesBytes == null) + txNodesBytes = ctx.marshaller().marshal(txNodes); + } + else { + if (txNodesMsg == null) + txNodesMsg = F.viewReadOnly(txNodes, COL_TO_MSG); + } } /** {@inheritDoc} */ @@ -334,7 +368,10 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage } } - if (txNodesBytes != null) + if (txNodesMsg != null) + txNodes = F.viewReadOnly(txNodesMsg, MSG_TO_COL); + + if (txNodesBytes != null && txNodes == null) txNodes = ctx.marshaller().unmarshal(txNodesBytes, ldr); } @@ -431,18 +468,24 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage writer.incrementState(); case 19: - if (!writer.writeInt("txSize", txSize)) + if (!writer.writeMap("txNodesMsg", txNodesMsg, MessageCollectionItemType.UUID, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 20: - if (!writer.writeMessage("writeVer", writeVer)) + if (!writer.writeInt("txSize", txSize)) return false; writer.incrementState(); case 21: + if (!writer.writeMessage("writeVer", writeVer)) + return false; + + writer.incrementState(); + + case 22: if (!writer.writeCollection("writes", writes, MessageCollectionItemType.MSG)) return false; @@ -569,7 +612,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage reader.incrementState(); case 19: - txSize = reader.readInt("txSize"); + txNodesMsg = reader.readMap("txNodesMsg", MessageCollectionItemType.UUID, MessageCollectionItemType.MSG, false); if (!reader.isLastRead()) return false; @@ -577,7 +620,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage reader.incrementState(); case 20: - writeVer = reader.readMessage("writeVer"); + txSize = reader.readInt("txSize"); if (!reader.isLastRead()) return false; @@ -585,6 +628,14 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage reader.incrementState(); case 21: + writeVer = reader.readMessage("writeVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 22: writes = reader.readCollection("writes", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -604,7 +655,7 @@ public class GridDistributedTxPrepareRequest extends GridDistributedBaseMessage /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 22; + return 23; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index 579d701..7284fd4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -380,10 +380,9 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> * @return Lock candidate. * @throws GridCacheEntryRemovedException If entry was removed. * @throws GridDistributedLockCancelledException If lock is canceled. - * @throws IgniteCheckedException If failed. */ @Nullable public GridCacheMvccCandidate addEntry(GridDhtCacheEntry entry) - throws GridCacheEntryRemovedException, GridDistributedLockCancelledException, IgniteCheckedException { + throws GridCacheEntryRemovedException, GridDistributedLockCancelledException { if (log.isDebugEnabled()) log.debug("Adding entry: " + entry); @@ -529,35 +528,57 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> if (log.isDebugEnabled()) log.debug("Received lock response from node [nodeId=" + nodeId + ", res=" + res + ", fut=" + this + ']'); - boolean found = false; + MiniFuture mini = miniFuture(res.miniId()); - for (IgniteInternalFuture<Boolean> fut : pending()) { - if (isMini(fut)) { - MiniFuture mini = (MiniFuture)fut; + if (mini != null) { + assert mini.node().id().equals(nodeId); - if (mini.futureId().equals(res.miniId())) { - assert mini.node().id().equals(nodeId); + if (log.isDebugEnabled()) + log.debug("Found mini future for response [mini=" + mini + ", res=" + res + ']'); - if (log.isDebugEnabled()) - log.debug("Found mini future for response [mini=" + mini + ", res=" + res + ']'); + mini.onResult(res); - found = true; + if (log.isDebugEnabled()) + log.debug("Futures after processed lock response [fut=" + this + ", mini=" + mini + + ", res=" + res + ']'); - mini.onResult(res); + return; + } - if (log.isDebugEnabled()) - log.debug("Futures after processed lock response [fut=" + this + ", mini=" + mini + - ", res=" + res + ']'); + U.warn(log, "Failed to find mini future for response (perhaps due to stale message) [res=" + res + + ", fut=" + this + ']'); + } + } - break; - } + /** + * Finds pending mini future by the given mini ID. + * + * @param miniId Mini ID to find. + * @return Mini future. + */ + @SuppressWarnings("ForLoopReplaceableByForEach") + private MiniFuture miniFuture(IgniteUuid miniId) { + // We iterate directly over the futs collection here to avoid copy. + synchronized (futs) { + // Avoid iterator creation. + for (int i = 0; i < futs.size(); i++) { + IgniteInternalFuture<Boolean> fut = futs.get(i); + + if (!isMini(fut)) + continue; + + MiniFuture mini = (MiniFuture)fut; + + if (mini.futureId().equals(miniId)) { + if (!mini.isDone()) + return mini; + else + return null; } } - - if (!found) - U.warn(log, "Failed to find mini future for response (perhaps due to stale message) [res=" + res + - ", fut=" + this + ']'); } + + return null; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java index 91ab1ca..18281d0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockRequest.java @@ -236,7 +236,7 @@ public class GridDhtLockRequest extends GridDistributedLockRequest { ) throws IgniteCheckedException { invalidateEntries.set(idx, invalidateEntry); - addKeyBytes(key, false, null, ctx); + addKeyBytes(key, false, ctx); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index fe91e5b..35f63e3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -39,7 +39,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; -import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException; import org.apache.ignite.internal.processors.cache.GridCacheLockTimeoutException; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheReturn; @@ -187,8 +186,6 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach IgniteTxKey txKey = ctx.txKey(key); - assert F.isEmpty(req.candidatesByIndex(i)); - if (log.isDebugEnabled()) log.debug("Unmarshalled key: " + key); @@ -671,7 +668,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach if (log.isDebugEnabled()) log.debug("Got removed entry when adding lock (will retry): " + entry); } - catch (IgniteCheckedException | GridDistributedLockCancelledException e) { + catch (GridDistributedLockCancelledException e) { if (log.isDebugEnabled()) log.debug("Failed to add entry [err=" + e + ", entry=" + entry + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 70ebf3f..55ca12d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -181,6 +181,15 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { } /** + * Gets flag that indicates that originating node has a near cache that participates in this transaction. + * + * @return Has near cache flag. + */ + public boolean nearOnOriginatingNode() { + return nearOnOriginatingNode; + } + + /** * @return {@code True} if explicit lock transaction. */ public boolean explicitLock() { http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index a67950d..d081c0c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -450,20 +450,45 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter */ public void onResult(UUID nodeId, GridDhtTxPrepareResponse res) { if (!isDone()) { - for (IgniteInternalFuture<IgniteInternalTx> fut : pending()) { - if (isMini(fut)) { - MiniFuture f = (MiniFuture)fut; + MiniFuture mini = miniFuture(res.miniId()); - if (f.futureId().equals(res.miniId())) { - assert f.node().id().equals(nodeId); + if (mini != null) { + assert mini.node().id().equals(nodeId); - f.onResult(res); + mini.onResult(res); + } + } + } - break; - } + /** + * Finds pending mini future by the given mini ID. + * + * @param miniId Mini ID to find. + * @return Mini future. + */ + @SuppressWarnings("ForLoopReplaceableByForEach") + private MiniFuture miniFuture(IgniteUuid miniId) { + // We iterate directly over the futs collection here to avoid copy. + synchronized (futs) { + // Avoid iterator creation. + for (int i = 0; i < futs.size(); i++) { + IgniteInternalFuture<IgniteInternalTx> fut = futs.get(i); + + if (!isMini(fut)) + continue; + + MiniFuture mini = (MiniFuture)fut; + + if (mini.futureId().equals(miniId)) { + if (!mini.isDone()) + return mini; + else + return null; } } } + + return null; } /** @@ -693,7 +718,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter tx.activeCachesDeploymentEnabled()); if (prepErr == null) { - addDhtValues(res); + if (tx.needReturnValue() || tx.nearOnOriginatingNode() || tx.hasInterceptor()) + addDhtValues(res); GridCacheVersion min = tx.minVersion(); @@ -949,7 +975,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter } } } - catch (GridCacheEntryRemovedException e) { + catch (GridCacheEntryRemovedException ignore) { assert false : "Got removed exception on entry with dht local candidate: " + entries; } @@ -1072,18 +1098,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter GridCacheContext<?, ?> cacheCtx = cached.context(); - if (entry.explicitVersion() == null) { - GridCacheMvccCandidate added = cached.candidate(version()); - - assert added != null : "Null candidate for non-group-lock entry " + - "[added=" + added + ", entry=" + entry + ']'; - assert added.dhtLocal() : "Got non-dht-local candidate for prepare future" + - "[added=" + added + ", entry=" + entry + ']'; - - if (added != null && added.ownerVersion() != null) - req.owned(entry.txKey(), added.ownerVersion()); - } - // Do not invalidate near entry on originating transaction node. req.invalidateNearEntry(idx, !tx.nearNodeId().equals(n.id()) && cached.readerId(n.id()) != null); @@ -1092,7 +1106,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter List<ClusterNode> owners = cacheCtx.topology().owners(cached.partition(), tx != null ? tx.topologyVersion() : cacheCtx.affinity().affinityTopologyVersion()); - // Do not preload if local node is partition owner. + // Do not preload if local node is a partition owner. if (!owners.contains(cctx.localNode())) req.markKeyForPreload(idx); } http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java index fcd66c2..394ff89 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareRequest.java @@ -345,79 +345,79 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { } switch (writer.state()) { - case 22: + case 23: if (!writer.writeIgniteUuid("futId", futId)) return false; writer.incrementState(); - case 23: + case 24: if (!writer.writeBitSet("invalidateNearEntries", invalidateNearEntries)) return false; writer.incrementState(); - case 24: + case 25: if (!writer.writeBoolean("last", last)) return false; writer.incrementState(); - case 25: + case 26: if (!writer.writeIgniteUuid("miniId", miniId)) return false; writer.incrementState(); - case 26: + case 27: if (!writer.writeUuid("nearNodeId", nearNodeId)) return false; writer.incrementState(); - case 27: + case 28: if (!writer.writeCollection("nearWrites", nearWrites, MessageCollectionItemType.MSG)) return false; writer.incrementState(); - case 28: + case 29: if (!writer.writeMessage("nearXidVer", nearXidVer)) return false; writer.incrementState(); - case 29: + case 30: if (!writer.writeCollection("ownedKeys", ownedKeys, MessageCollectionItemType.MSG)) return false; writer.incrementState(); - case 30: + case 31: if (!writer.writeCollection("ownedVals", ownedVals, MessageCollectionItemType.MSG)) return false; writer.incrementState(); - case 31: + case 32: if (!writer.writeBitSet("preloadKeys", preloadKeys)) return false; writer.incrementState(); - case 32: + case 33: if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); - case 33: + case 34: if (!writer.writeInt("taskNameHash", taskNameHash)) return false; writer.incrementState(); - case 34: + case 35: if (!writer.writeMessage("topVer", topVer)) return false; @@ -439,7 +439,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { return false; switch (reader.state()) { - case 22: + case 23: futId = reader.readIgniteUuid("futId"); if (!reader.isLastRead()) @@ -447,7 +447,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 23: + case 24: invalidateNearEntries = reader.readBitSet("invalidateNearEntries"); if (!reader.isLastRead()) @@ -455,7 +455,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 24: + case 25: last = reader.readBoolean("last"); if (!reader.isLastRead()) @@ -463,7 +463,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 25: + case 26: miniId = reader.readIgniteUuid("miniId"); if (!reader.isLastRead()) @@ -471,7 +471,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 26: + case 27: nearNodeId = reader.readUuid("nearNodeId"); if (!reader.isLastRead()) @@ -479,7 +479,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 27: + case 28: nearWrites = reader.readCollection("nearWrites", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -487,7 +487,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 28: + case 29: nearXidVer = reader.readMessage("nearXidVer"); if (!reader.isLastRead()) @@ -495,7 +495,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 29: + case 30: ownedKeys = reader.readCollection("ownedKeys", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -503,7 +503,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 30: + case 31: ownedVals = reader.readCollection("ownedVals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -511,7 +511,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 31: + case 32: preloadKeys = reader.readBitSet("preloadKeys"); if (!reader.isLastRead()) @@ -519,7 +519,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 32: + case 33: subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) @@ -527,7 +527,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 33: + case 34: taskNameHash = reader.readInt("taskNameHash"); if (!reader.isLastRead()) @@ -535,7 +535,7 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { reader.incrementState(); - case 34: + case 35: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -555,6 +555,6 @@ public class GridDhtTxPrepareRequest extends GridDistributedTxPrepareRequest { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 35; + return 36; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 83c220d..7131aa5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -873,7 +873,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte if (log.isDebugEnabled()) log.debug("Got removed entry when adding lock (will retry): " + entry); } - catch (IgniteCheckedException | GridDistributedLockCancelledException e) { + catch (GridDistributedLockCancelledException e) { if (log.isDebugEnabled()) log.debug("Failed to add entry [err=" + e + ", entry=" + entry + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 365b46b..abeb509 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -428,25 +428,21 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture log.debug("Received lock response from node [nodeId=" + nodeId + ", res=" + res + ", fut=" + this + ']'); - for (IgniteInternalFuture<Boolean> fut : pending()) { - if (isMini(fut)) { - MiniFuture mini = (MiniFuture)fut; + MiniFuture mini = miniFuture(res.miniId()); - if (mini.futureId().equals(res.miniId())) { - assert mini.node().id().equals(nodeId); + if (mini != null) { + assert mini.node().id().equals(nodeId); - if (log.isDebugEnabled()) - log.debug("Found mini future for response [mini=" + mini + ", res=" + res + ']'); + if (log.isDebugEnabled()) + log.debug("Found mini future for response [mini=" + mini + ", res=" + res + ']'); - mini.onResult(res); + mini.onResult(res); - if (log.isDebugEnabled()) - log.debug("Future after processed lock response [fut=" + this + ", mini=" + mini + - ", res=" + res + ']'); + if (log.isDebugEnabled()) + log.debug("Future after processed lock response [fut=" + this + ", mini=" + mini + + ", res=" + res + ']'); - return; - } - } + return; } U.warn(log, "Failed to find mini future for response (perhaps due to stale message) [res=" + res + @@ -458,6 +454,37 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture } /** + * Finds pending mini future by the given mini ID. + * + * @param miniId Mini ID to find. + * @return Mini future. + */ + @SuppressWarnings("ForLoopReplaceableByForEach") + private MiniFuture miniFuture(IgniteUuid miniId) { + // We iterate directly over the futs collection here to avoid copy. + synchronized (futs) { + // Avoid iterator creation. + for (int i = 0; i < futs.size(); i++) { + IgniteInternalFuture<Boolean> fut = futs.get(i); + + if (!isMini(fut)) + continue; + + MiniFuture mini = (MiniFuture)fut; + + if (mini.futureId().equals(miniId)) { + if (!mini.isDone()) + return mini; + else + return null; + } + } + } + + return null; + } + + /** * @param t Error. */ private void onError(Throwable t) { http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index c5b55bd..9c3701f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -478,25 +478,21 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean if (log.isDebugEnabled()) log.debug("Received lock response from node [nodeId=" + nodeId + ", res=" + res + ", fut=" + this + ']'); - for (IgniteInternalFuture<Boolean> fut : pending()) { - if (isMini(fut)) { - MiniFuture mini = (MiniFuture)fut; + MiniFuture mini = miniFuture(res.miniId()); - if (mini.futureId().equals(res.miniId())) { - assert mini.node().id().equals(nodeId); + if (mini != null) { + assert mini.node().id().equals(nodeId); - if (log.isDebugEnabled()) - log.debug("Found mini future for response [mini=" + mini + ", res=" + res + ']'); + if (log.isDebugEnabled()) + log.debug("Found mini future for response [mini=" + mini + ", res=" + res + ']'); - mini.onResult(res); + mini.onResult(res); - if (log.isDebugEnabled()) - log.debug("Future after processed lock response [fut=" + this + ", mini=" + mini + - ", res=" + res + ']'); + if (log.isDebugEnabled()) + log.debug("Future after processed lock response [fut=" + this + ", mini=" + mini + + ", res=" + res + ']'); - return; - } - } + return; } U.warn(log, "Failed to find mini future for response (perhaps due to stale message) [res=" + res + @@ -508,6 +504,38 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean } /** + * Finds pending mini future by the given mini ID. + * + * @param miniId Mini ID to find. + * @return Mini future. + */ + @SuppressWarnings("ForLoopReplaceableByForEach") + private MiniFuture miniFuture(IgniteUuid miniId) { + // We iterate directly over the futs collection here to avoid copy. + synchronized (futs) { + // Avoid iterator creation. + for (int i = 0; i < futs.size(); i++) { + IgniteInternalFuture<Boolean> fut = futs.get(i); + + if (!isMini(fut)) + continue; + + MiniFuture mini = (MiniFuture)fut; + + if (mini.futureId().equals(miniId)) { + if (!mini.isDone()) + return mini; + else + return null; + } + } + } + + return null; + } + + + /** * @param t Error. */ private void onError(Throwable t) { http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java index 165da84..805a6a2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockRequest.java @@ -20,13 +20,11 @@ package org.apache.ignite.internal.processors.cache.distributed.near; import java.io.Externalizable; import java.nio.ByteBuffer; import java.util.Arrays; -import java.util.Collection; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockRequest; @@ -300,7 +298,7 @@ public class GridNearLockRequest extends GridDistributedLockRequest { dhtVers[idx] = dhtVer; // Delegate to super. - addKeyBytes(key, retVal, (Collection<GridCacheMvccCandidate>)null, ctx); + addKeyBytes(key, retVal, ctx); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java index 29774a5..1569b14 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@ -46,6 +46,7 @@ import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedExceptio import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; import org.apache.ignite.internal.util.GridConcurrentHashSet; +import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -210,17 +211,10 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim /** {@inheritDoc} */ @Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) { if (!isDone()) { - for (IgniteInternalFuture<GridNearTxPrepareResponse> fut : pending()) { - if (isMini(fut)) { - MiniFuture f = (MiniFuture)fut; + MiniFuture mini = miniFuture(res.miniId()); - if (f.futureId().equals(res.miniId())) { - assert f.node().id().equals(nodeId); - - f.onResult(res); - } - } - } + if (mini != null) + mini.onResult(res); } } @@ -239,6 +233,37 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim } /** + * Finds pending mini future by the given mini ID. + * + * @param miniId Mini ID to find. + * @return Mini future. + */ + @SuppressWarnings("ForLoopReplaceableByForEach") + private MiniFuture miniFuture(IgniteUuid miniId) { + // We iterate directly over the futs collection here to avoid copy. + synchronized (futs) { + // Avoid iterator creation. + for (int i = 0; i < futs.size(); i++) { + IgniteInternalFuture<GridNearTxPrepareResponse> fut = futs.get(i); + + if (!isMini(fut)) + continue; + + MiniFuture mini = (MiniFuture)fut; + + if (mini.futureId().equals(miniId)) { + if (!mini.isDone()) + return mini; + else + return null; + } + } + } + + return null; + } + + /** * @param f Future. * @return {@code True} if mini-future. */ @@ -276,32 +301,27 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim * @param remap Remap flag. */ @Override protected void prepare0(boolean remap, boolean topLocked) { - try { - boolean txStateCheck = remap ? tx.state() == PREPARING : tx.state(PREPARING); - - if (!txStateCheck) { - if (tx.setRollbackOnly()) { - if (tx.timedOut()) - onError(null, new IgniteTxTimeoutCheckedException("Transaction timed out and " + - "was rolled back: " + this)); - else - onError(null, new IgniteCheckedException("Invalid transaction state for prepare " + - "[state=" + tx.state() + ", tx=" + this + ']')); - } - else - onError(null, new IgniteTxRollbackCheckedException("Invalid transaction state for " + - "prepare [state=" + tx.state() + ", tx=" + this + ']')); + boolean txStateCheck = remap ? tx.state() == PREPARING : tx.state(PREPARING); - return; + if (!txStateCheck) { + if (tx.setRollbackOnly()) { + if (tx.timedOut()) + onError(null, new IgniteTxTimeoutCheckedException("Transaction timed out and " + + "was rolled back: " + this)); + else + onError(null, new IgniteCheckedException("Invalid transaction state for prepare " + + "[state=" + tx.state() + ", tx=" + this + ']')); } + else + onError(null, new IgniteTxRollbackCheckedException("Invalid transaction state for " + + "prepare [state=" + tx.state() + ", tx=" + this + ']')); - prepare(tx.readEntries(), tx.writeEntries(), remap, topLocked); - - markInitialized(); - } - catch (IgniteCheckedException e) { - onDone(e); + return; } + + prepare(tx.readEntries(), tx.writeEntries(), remap, topLocked); + + markInitialized(); } /** @@ -309,7 +329,6 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim * @param writes Write entries. * @param remap Remap flag. * @param topLocked Topology locked flag. - * @throws IgniteCheckedException If failed. */ @SuppressWarnings("unchecked") private void prepare( @@ -317,7 +336,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim Iterable<IgniteTxEntry> writes, boolean remap, boolean topLocked - ) throws IgniteCheckedException { + ) { AffinityTopologyVersion topVer = tx.topologyVersion(); assert topVer.topologyVersion() > 0; @@ -355,9 +374,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim for (GridDistributedTxMapping m : mappings.values()) { assert !m.empty(); - MiniFuture fut = new MiniFuture(m); - - add(fut); + add(new MiniFuture(m)); } Collection<IgniteInternalFuture<?>> futs = (Collection)futures(); http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index 791d2f3..82e3868 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -45,6 +45,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; import org.apache.ignite.internal.util.GridConcurrentHashSet; +import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.C1; @@ -187,18 +188,45 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa /** {@inheritDoc} */ @Override public void onResult(UUID nodeId, GridNearTxPrepareResponse res) { if (!isDone()) { - for (IgniteInternalFuture<GridNearTxPrepareResponse> fut : pending()) { - if (isMini(fut)) { - MiniFuture f = (MiniFuture)fut; + MiniFuture mini = miniFuture(res.miniId()); - if (f.futureId().equals(res.miniId())) { - assert f.node().id().equals(nodeId); + if (mini != null) { + assert mini.node().id().equals(nodeId); - f.onResult(nodeId, res); - } + mini.onResult(nodeId, res); + } + } + } + + /** + * Finds pending mini future by the given mini ID. + * + * @param miniId Mini ID to find. + * @return Mini future. + */ + @SuppressWarnings("ForLoopReplaceableByForEach") + private MiniFuture miniFuture(IgniteUuid miniId) { + // We iterate directly over the futs collection here to avoid copy. + synchronized (futs) { + // Avoid iterator creation. + for (int i = 0; i < futs.size(); i++) { + IgniteInternalFuture<GridNearTxPrepareResponse> fut = futs.get(i); + + if (!isMini(fut)) + continue; + + MiniFuture mini = (MiniFuture)fut; + + if (mini.futureId().equals(miniId)) { + if (!mini.isDone()) + return mini; + else + return null; } } } + + return null; } /** {@inheritDoc} */ @@ -277,10 +305,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa markInitialized(); } catch (TransactionTimeoutException e) { - onError( e); - } - catch (IgniteCheckedException e) { - onDone(e); + onError(e); } } @@ -327,12 +352,11 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa /** * @param writes Write entries. * @param topLocked {@code True} if thread already acquired lock preventing topology change. - * @throws IgniteCheckedException If failed. */ private void prepare( Iterable<IgniteTxEntry> writes, boolean topLocked - ) throws IgniteCheckedException { + ) { AffinityTopologyVersion topVer = tx.topologyVersion(); assert topVer.topologyVersion() > 0; http://git-wip-us.apache.org/repos/asf/ignite/blob/175b7f24/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java index 1554a62..103105e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java @@ -103,20 +103,45 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA if (!isDone()) { assert res.clientRemapVersion() == null : res; - for (IgniteInternalFuture<GridNearTxPrepareResponse> fut : pending()) { - MiniFuture f = (MiniFuture)fut; + MiniFuture f = miniFuture(res.miniId()); - if (f.futureId().equals(res.miniId())) { - assert f.node().id().equals(nodeId); + if (f != null) { + assert f.node().id().equals(nodeId); - if (log.isDebugEnabled()) - log.debug("Remote node left grid while sending or waiting for reply (will not retry): " + f); + if (log.isDebugEnabled()) + log.debug("Remote node left grid while sending or waiting for reply (will not retry): " + f); - f.onResult(res); + f.onResult(res); + } + } + } + + /** + * Finds pending mini future by the given mini ID. + * + * @param miniId Mini ID to find. + * @return Mini future. + */ + @SuppressWarnings("ForLoopReplaceableByForEach") + private MiniFuture miniFuture(IgniteUuid miniId) { + // We iterate directly over the futs collection here to avoid copy. + synchronized (futs) { + // Avoid iterator creation. + for (int i = 0; i < futs.size(); i++) { + MiniFuture mini = (MiniFuture)futs.get(i); + + if (mini.futureId().equals(miniId)) { + if (!mini.isDone()) + return mini; + else + return null; } } } + + return null; } + /** {@inheritDoc} */ @Override public void prepare() { if (!tx.state(PREPARING)) {