IGNITE-264 - Check backup node for one-phase transaction when primary node crashes.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e1707b68 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e1707b68 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e1707b68 Branch: refs/heads/master Commit: e1707b6852f9d7c3e4999ea1d3967db68e7d8634 Parents: 071586e Author: Alexey Goncharuk <alexey.goncha...@gmail.com> Authored: Thu Sep 10 23:29:33 2015 -0700 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Thu Sep 10 23:29:33 2015 -0700 ---------------------------------------------------------------------- .../internal/portable/PortableContext.java | 18 +- .../processors/cache/GridCacheAdapter.java | 6 + .../processors/cache/GridCacheContext.java | 4 +- .../processors/cache/GridCacheIoManager.java | 4 +- .../processors/cache/GridCacheMvcc.java | 5 +- .../distributed/GridCacheTxRecoveryFuture.java | 11 +- .../distributed/GridDistributedCacheEntry.java | 6 +- .../GridDistributedTxFinishRequest.java | 13 +- .../GridDistributedTxRemoteAdapter.java | 10 +- .../dht/GridDhtTransactionalCacheAdapter.java | 514 +++++++++---------- .../distributed/dht/GridDhtTxFinishFuture.java | 15 +- .../distributed/dht/GridDhtTxFinishRequest.java | 84 ++- .../dht/GridDhtTxFinishResponse.java | 89 +++- .../cache/distributed/dht/GridDhtTxLocal.java | 4 +- .../distributed/dht/GridDhtTxLocalAdapter.java | 67 ++- .../distributed/dht/GridDhtTxPrepareFuture.java | 32 +- .../cache/distributed/dht/GridDhtTxRemote.java | 40 +- .../dht/GridPartitionedGetFuture.java | 4 +- .../dht/atomic/GridNearAtomicUpdateFuture.java | 4 +- .../colocated/GridDhtColocatedLockFuture.java | 11 +- .../distributed/near/GridNearLockFuture.java | 11 +- .../distributed/near/GridNearLockRequest.java | 18 +- .../near/GridNearOptimisticTxPrepareFuture.java | 52 +- .../GridNearPessimisticTxPrepareFuture.java | 11 +- .../near/GridNearTxFinishFuture.java | 319 ++++++++++-- .../near/GridNearTxFinishRequest.java | 20 +- .../cache/distributed/near/GridNearTxLocal.java | 64 +-- .../distributed/near/GridNearTxRemote.java | 38 +- .../cache/transactions/IgniteTxAdapter.java | 5 +- .../cache/transactions/IgniteTxHandler.java | 281 +++++----- .../transactions/IgniteTxLocalAdapter.java | 37 +- .../cache/transactions/IgniteTxManager.java | 48 +- .../datastructures/DataStructuresProcessor.java | 102 ++-- .../GridTransactionalCacheQueueImpl.java | 15 +- .../processors/igfs/IgfsFileAffinityRange.java | 4 +- .../portable/GridPortableMetaDataSelfTest.java | 2 + .../CacheStoreUsageMultinodeAbstractTest.java | 16 +- .../cache/GridCacheAbstractFullApiSelfTest.java | 50 +- .../processors/cache/GridCacheMvccSelfTest.java | 4 +- .../cache/GridCachePutAllFailoverSelfTest.java | 28 +- .../cache/IgniteCachePutAllRestartTest.java | 2 + .../cache/IgniteInternalCacheTypesTest.java | 4 +- .../cache/IgniteOnePhaseCommitNearSelfTest.java | 243 +++++++++ ...ridCachePartitionNotLoadedEventSelfTest.java | 27 +- .../GridCacheTransformEventSelfTest.java | 5 +- .../dht/GridCacheTxNodeFailureSelfTest.java | 400 +++++++++++++++ .../dht/GridNearCacheTxNodeFailureSelfTest.java | 31 ++ ...gniteAtomicLongChangingTopologySelfTest.java | 283 ++++++++++ .../near/IgniteCacheNearOnlyTxTest.java | 14 +- .../IgniteCacheFailoverTestSuite.java | 9 +- 50 files changed, 2350 insertions(+), 734 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java index c64adc8..165ad9a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java @@ -440,7 +440,7 @@ public class PortableContext implements Externalizable { PortableClassDescriptor desc = descByCls.get(cls); if (desc == null || !desc.registered()) - desc = registerClassDescriptor(cls); + desc = registerClassDescriptor(cls, true); return desc; } @@ -485,7 +485,7 @@ public class PortableContext implements Externalizable { } if (desc == null) { - desc = registerClassDescriptor(cls); + desc = registerClassDescriptor(cls, false); assert desc.typeId() == typeId; } @@ -499,7 +499,7 @@ public class PortableContext implements Externalizable { * @param cls Class. * @return Class descriptor. */ - private PortableClassDescriptor registerClassDescriptor(Class<?> cls) { + private PortableClassDescriptor registerClassDescriptor(Class<?> cls, boolean registerMetadata) { PortableClassDescriptor desc; String clsName = cls.getName(); @@ -525,7 +525,7 @@ public class PortableContext implements Externalizable { desc = old; } else - desc = registerUserClassDescriptor(cls); + desc = registerUserClassDescriptor(cls, registerMetadata); return desc; } @@ -536,9 +536,7 @@ public class PortableContext implements Externalizable { * @param cls Class. * @return Class descriptor. */ - private PortableClassDescriptor registerUserClassDescriptor(Class<?> cls) { - PortableClassDescriptor desc; - + private PortableClassDescriptor registerUserClassDescriptor(Class<?> cls, boolean registerMetadata) { boolean registered; String typeName = typeName(cls.getName()); @@ -555,7 +553,7 @@ public class PortableContext implements Externalizable { throw new PortableException("Failed to register class.", e); } - desc = new PortableClassDescriptor(this, + PortableClassDescriptor desc = new PortableClassDescriptor(this, cls, true, typeId, @@ -573,6 +571,10 @@ public class PortableContext implements Externalizable { userTypes.put(typeId, desc); descByCls.put(cls, desc); + // TODO uncomment for https://issues.apache.org/jira/browse/IGNITE-1377 +// if (registerMetadata && isMetaDataEnabled(typeId)) +// metaHnd.addMeta(typeId, new PortableMetaDataImpl(typeName, desc.fieldsMeta(), null)); + return desc; } http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 79c5e4b..4460a2a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -4230,6 +4230,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V try { return tFut.get(); } + catch (IgniteTxRollbackCheckedException e) { + throw e; + } catch (IgniteCheckedException e1) { tx0.rollbackAsync(); @@ -4253,6 +4256,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V try { return tFut.get(); } + catch (IgniteTxRollbackCheckedException e) { + throw e; + } catch (IgniteCheckedException e1) { tx0.rollbackAsync(); http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 3d0f1ae..86ba3e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -480,7 +480,7 @@ public class GridCacheContext<K, V> implements Externalizable { * @return {@code True} if should use system transactions which are isolated from user transactions. */ public boolean systemTx() { - return cacheType == CacheType.UTILITY; + return cacheType == CacheType.UTILITY || (cacheType == CacheType.INTERNAL && transactional()); } /** @@ -1977,4 +1977,4 @@ public class GridCacheContext<K, V> implements Externalizable { @Override public String toString() { return "GridCacheContext: " + name(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index a935b26..b55c84d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -643,7 +643,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { if (!cctx.discovery().alive(node.id()) || !cctx.discovery().pingNode(node.id())) throw new ClusterTopologyCheckedException("Node left grid while sending message to: " + node.id(), e); - if (cnt == retryCnt) + if (cnt == retryCnt || cctx.kernalContext().isStopping()) throw e; else if (log.isDebugEnabled()) log.debug("Failed to send message to node (will retry): " + node.id()); @@ -1107,4 +1107,4 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { return res; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java index d1393ce..c2102bd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMvcc.java @@ -527,7 +527,8 @@ public final class GridCacheMvcc { /*reenter*/false, tx, implicitSingle, - /*near-local*/false, dhtLoc + /*near-local*/false, + dhtLoc ); cctx.mvcc().addLocal(cand); @@ -1271,4 +1272,4 @@ public final class GridCacheMvcc { @Override public String toString() { // Synchronize to ensure one-thread at a time. return S.toString(GridCacheMvcc.class, this); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java index eaaff67..d6f6a18 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridCacheTxRecoveryFuture.java @@ -167,7 +167,7 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea try { cctx.io().send(nearNodeId, req, tx.ioPolicy()); } - catch (ClusterTopologyCheckedException e) { + catch (ClusterTopologyCheckedException ignore) { fut.onNodeLeft(); } catch (IgniteCheckedException e) { @@ -374,14 +374,11 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea if (isMini(fut)) { MiniFuture f = (MiniFuture)fut; - if (f.nodeId().equals(nodeId)) { + if (f.nodeId().equals(nodeId)) f.onNodeLeft(); - - return true; - } } - return false; + return true; } /** {@inheritDoc} */ @@ -509,4 +506,4 @@ public class GridCacheTxRecoveryFuture extends GridCompoundIdentityFuture<Boolea return S.toString(MiniFuture.class, this, "done", isDone(), "err", error()); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java index 3fada86..6904e56 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java @@ -186,8 +186,8 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry { long timeout, boolean tx, boolean implicitSingle, - @Nullable GridCacheVersion owned) throws GridDistributedLockCancelledException, - GridCacheEntryRemovedException { + @Nullable GridCacheVersion owned + ) throws GridDistributedLockCancelledException, GridCacheEntryRemovedException { GridCacheMvccCandidate prev; GridCacheMvccCandidate owner; @@ -872,4 +872,4 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry { @Override public synchronized String toString() { return S.toString(GridDistributedCacheEntry.class, this, super.toString()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java index fdd59be..ddf6799 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishRequest.java @@ -210,17 +210,6 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage { return commit ? syncCommit : syncRollback; } - /** {@inheritDoc} - * @param ctx*/ - @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { - super.prepareMarshal(ctx); - } - - /** {@inheritDoc} */ - @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { - super.finishUnmarshal(ctx, ldr); - } - /** {@inheritDoc} */ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { writer.setBuffer(buf); @@ -426,4 +415,4 @@ public class GridDistributedTxFinishRequest extends GridDistributedBaseMessage { return GridToStringBuilder.toString(GridDistributedTxFinishRequest.class, this, "super", super.toString()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index 41f9872..c930d88 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -458,17 +458,17 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter assert txEntry != null : "Missing transaction entry for tx: " + this; while (true) { - GridCacheEntryEx Entry = txEntry.cached(); + GridCacheEntryEx entry = txEntry.cached(); - assert Entry != null : "Missing cached entry for transaction entry: " + txEntry; + assert entry != null : "Missing cached entry for transaction entry: " + txEntry; try { GridCacheVersion ver = txEntry.explicitVersion() != null ? txEntry.explicitVersion() : xidVer; // If locks haven't been acquired yet, keep waiting. - if (!Entry.lockedBy(ver)) { + if (!entry.lockedBy(ver)) { if (log.isDebugEnabled()) - log.debug("Transaction does not own lock for entry (will wait) [entry=" + Entry + + log.debug("Transaction does not own lock for entry (will wait) [entry=" + entry + ", tx=" + this + ']'); return; @@ -802,4 +802,4 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter @Override public String toString() { return GridToStringBuilder.toString(GridDistributedTxRemoteAdapter.class, this, "super", super.toString()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index d81b72c..b9514a9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -709,337 +709,311 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach @Nullable final CacheEntryPredicate[] filter0) { final List<KeyCacheObject> keys = req.keys(); - IgniteInternalFuture<Object> keyFut = null; + CacheEntryPredicate[] filter = filter0; - if (req.onePhaseCommit()) { - boolean forceKeys = req.hasTransforms() || req.filter() != null; + // Set message into thread context. + GridDhtTxLocal tx = null; - if (!forceKeys) { - for (int i = 0; i < req.keysCount() && !forceKeys; i++) - forceKeys |= req.returnValue(i); - } - - if (forceKeys) - keyFut = ctx.dht().dhtPreloader().request(keys, req.topologyVersion()); - } + try { + int cnt = keys.size(); - if (keyFut == null) - keyFut = new GridFinishedFuture<>(); + if (req.inTx()) { + GridCacheVersion dhtVer = ctx.tm().mappedVersion(req.version()); - return new GridEmbeddedFuture<>(keyFut, - new C2<Object, Exception, IgniteInternalFuture<GridNearLockResponse>>() { - @Override public IgniteInternalFuture<GridNearLockResponse> apply(Object o, Exception exx) { - if (exx != null) - return new GridDhtFinishedFuture<>(exx); + if (dhtVer != null) + tx = ctx.tm().tx(dhtVer); + } - CacheEntryPredicate[] filter = filter0; + final List<GridCacheEntryEx> entries = new ArrayList<>(cnt); - // Set message into thread context. - GridDhtTxLocal tx = null; + // Unmarshal filter first. + if (filter == null) + filter = req.filter(); - try { - int cnt = keys.size(); + GridDhtLockFuture fut = null; - if (req.inTx()) { - GridCacheVersion dhtVer = ctx.tm().mappedVersion(req.version()); + if (!req.inTx()) { + GridDhtPartitionTopology top = null; - if (dhtVer != null) - tx = ctx.tm().tx(dhtVer); - } + if (req.firstClientRequest()) { + assert CU.clientNode(nearNode); - final List<GridCacheEntryEx> entries = new ArrayList<>(cnt); + top = topology(); - // Unmarshal filter first. - if (filter == null) - filter = req.filter(); + topology().readLock(); + } - GridDhtLockFuture fut = null; + try { + if (top != null && needRemap(req.topologyVersion(), top.topologyVersion())) { + if (log.isDebugEnabled()) { + log.debug("Client topology version mismatch, need remap lock request [" + + "reqTopVer=" + req.topologyVersion() + + ", locTopVer=" + top.topologyVersion() + + ", req=" + req + ']'); + } - if (!req.inTx()) { - GridDhtPartitionTopology top = null; + GridNearLockResponse res = sendClientLockRemapResponse(nearNode, + req, + top.topologyVersion()); - if (req.firstClientRequest()) { - assert CU.clientNode(nearNode); + return new GridFinishedFuture<>(res); + } - top = topology(); + fut = new GridDhtLockFuture(ctx, + nearNode.id(), + req.version(), + req.topologyVersion(), + cnt, + req.txRead(), + req.needReturnValue(), + req.timeout(), + tx, + req.threadId(), + req.accessTtl(), + filter, + req.skipStore()); + + // Add before mapping. + if (!ctx.mvcc().addFuture(fut)) + throw new IllegalStateException("Duplicate future ID: " + fut); + } + finally { + if (top != null) + top.readUnlock(); + } + } - topology().readLock(); - } + boolean timedout = false; - try { - if (top != null && needRemap(req.topologyVersion(), top.topologyVersion())) { - if (log.isDebugEnabled()) { - log.debug("Client topology version mismatch, need remap lock request [" + - "reqTopVer=" + req.topologyVersion() + - ", locTopVer=" + top.topologyVersion() + - ", req=" + req + ']'); - } + for (KeyCacheObject key : keys) { + if (timedout) + break; - GridNearLockResponse res = sendClientLockRemapResponse(nearNode, - req, - top.topologyVersion()); + while (true) { + // Specify topology version to make sure containment is checked + // based on the requested version, not the latest. + GridDhtCacheEntry entry = entryExx(key, req.topologyVersion()); - return new GridFinishedFuture<>(res); - } + try { + if (fut != null) { + // This method will add local candidate. + // Entry cannot become obsolete after this method succeeded. + fut.addEntry(key == null ? null : entry); - fut = new GridDhtLockFuture(ctx, - nearNode.id(), - req.version(), - req.topologyVersion(), - cnt, - req.txRead(), - req.needReturnValue(), - req.timeout(), - tx, - req.threadId(), - req.accessTtl(), - filter, - req.skipStore()); + if (fut.isDone()) { + timedout = true; - // Add before mapping. - if (!ctx.mvcc().addFuture(fut)) - throw new IllegalStateException("Duplicate future ID: " + fut); - } - finally { - if (top != null) - top.readUnlock(); + break; } } - boolean timedout = false; - - for (KeyCacheObject key : keys) { - if (timedout) - break; + entries.add(entry); - while (true) { - // Specify topology version to make sure containment is checked - // based on the requested version, not the latest. - GridDhtCacheEntry entry = entryExx(key, req.topologyVersion()); + break; + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry when adding lock (will retry): " + entry); + } + catch (GridDistributedLockCancelledException e) { + if (log.isDebugEnabled()) + log.debug("Got lock request for cancelled lock (will ignore): " + + entry); - try { - if (fut != null) { - // This method will add local candidate. - // Entry cannot become obsolete after this method succeeded. - fut.addEntry(key == null ? null : entry); + fut.onError(e); - if (fut.isDone()) { - timedout = true; + return new GridDhtFinishedFuture<>(e); + } + } + } - break; - } - } + // Handle implicit locks for pessimistic transactions. + if (req.inTx()) { + if (tx == null) { + GridDhtPartitionTopology top = null; - entries.add(entry); + if (req.firstClientRequest()) { + assert CU.clientNode(nearNode); - break; - } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed entry when adding lock (will retry): " + entry); - } - catch (GridDistributedLockCancelledException e) { - if (log.isDebugEnabled()) - log.debug("Got lock request for cancelled lock (will ignore): " + - entry); + top = topology(); - fut.onError(e); + topology().readLock(); + } - return new GridDhtFinishedFuture<>(e); - } + try { + if (top != null && needRemap(req.topologyVersion(), top.topologyVersion())) { + if (log.isDebugEnabled()) { + log.debug("Client topology version mismatch, need remap lock request [" + + "reqTopVer=" + req.topologyVersion() + + ", locTopVer=" + top.topologyVersion() + + ", req=" + req + ']'); } - } - // Handle implicit locks for pessimistic transactions. - if (req.inTx()) { - if (tx == null) { - GridDhtPartitionTopology top = null; + GridNearLockResponse res = sendClientLockRemapResponse(nearNode, + req, + top.topologyVersion()); - if (req.firstClientRequest()) { - assert CU.clientNode(nearNode); + return new GridFinishedFuture<>(res); + } - top = topology(); + tx = new GridDhtTxLocal( + ctx.shared(), + nearNode.id(), + req.version(), + req.futureId(), + req.miniId(), + req.threadId(), + req.implicitTx(), + req.implicitSingleTx(), + ctx.systemTx(), + false, + ctx.ioPolicy(), + PESSIMISTIC, + req.isolation(), + req.timeout(), + req.isInvalidate(), + !req.skipStore(), + false, + req.txSize(), + null, + req.subjectId(), + req.taskNameHash()); - topology().readLock(); - } + tx.syncCommit(req.syncCommit()); - try { - if (top != null && needRemap(req.topologyVersion(), top.topologyVersion())) { - if (log.isDebugEnabled()) { - log.debug("Client topology version mismatch, need remap lock request [" + - "reqTopVer=" + req.topologyVersion() + - ", locTopVer=" + top.topologyVersion() + - ", req=" + req + ']'); - } + tx = ctx.tm().onCreated(null, tx); - GridNearLockResponse res = sendClientLockRemapResponse(nearNode, - req, - top.topologyVersion()); + if (tx == null || !tx.init()) { + String msg = "Failed to acquire lock (transaction has been completed): " + + req.version(); - return new GridFinishedFuture<>(res); - } + U.warn(log, msg); - tx = new GridDhtTxLocal( - ctx.shared(), - nearNode.id(), - req.version(), - req.futureId(), - req.miniId(), - req.threadId(), - req.implicitTx(), - req.implicitSingleTx(), - ctx.systemTx(), - false, - ctx.ioPolicy(), - PESSIMISTIC, - req.isolation(), - req.timeout(), - req.isInvalidate(), - true, - req.txSize(), - null, - req.subjectId(), - req.taskNameHash()); - - tx.syncCommit(req.syncCommit()); - - tx = ctx.tm().onCreated(null, tx); - - if (tx == null || !tx.init()) { - String msg = "Failed to acquire lock (transaction has been completed): " + - req.version(); - - U.warn(log, msg); - - if (tx != null) - tx.rollback(); - - return new GridDhtFinishedFuture<>(new IgniteCheckedException(msg)); - } + if (tx != null) + tx.rollback(); - tx.topologyVersion(req.topologyVersion()); - } - finally { - if (top != null) - top.readUnlock(); - } - } + return new GridDhtFinishedFuture<>(new IgniteCheckedException(msg)); + } - ctx.tm().txContext(tx); + tx.topologyVersion(req.topologyVersion()); + } + finally { + if (top != null) + top.readUnlock(); + } + } - if (log.isDebugEnabled()) - log.debug("Performing DHT lock [tx=" + tx + ", entries=" + entries + ']'); + ctx.tm().txContext(tx); - IgniteInternalFuture<GridCacheReturn> txFut = tx.lockAllAsync( - cacheCtx, + if (log.isDebugEnabled()) + log.debug("Performing DHT lock [tx=" + tx + ", entries=" + entries + ']'); + + IgniteInternalFuture<GridCacheReturn> txFut = tx.lockAllAsync( + cacheCtx, + entries, + req.messageId(), + req.txRead(), + req.needReturnValue(), + req.accessTtl(), + req.skipStore()); + + final GridDhtTxLocal t = tx; + + return new GridDhtEmbeddedFuture( + txFut, + new C2<GridCacheReturn, Exception, IgniteInternalFuture<GridNearLockResponse>>() { + @Override public IgniteInternalFuture<GridNearLockResponse> apply( + GridCacheReturn o, Exception e) { + if (e != null) + e = U.unwrap(e); + + assert !t.empty(); + + // Create response while holding locks. + final GridNearLockResponse resp = createLockReply(nearNode, entries, - req.onePhaseCommit(), - req.messageId(), - req.txRead(), - req.needReturnValue(), - req.accessTtl(), - req.skipStore()); + req, + t, + t.xidVersion(), + e); + + if (resp.error() == null && t.onePhaseCommit()) { + assert t.implicit(); + + return t.commitAsync().chain( + new C1<IgniteInternalFuture<IgniteInternalTx>, GridNearLockResponse>() { + @Override public GridNearLockResponse apply(IgniteInternalFuture<IgniteInternalTx> f) { + try { + // Check for error. + f.get(); + } + catch (IgniteCheckedException e1) { + resp.error(e1); + } - final GridDhtTxLocal t = tx; - - return new GridDhtEmbeddedFuture( - txFut, - new C2<GridCacheReturn, Exception, IgniteInternalFuture<GridNearLockResponse>>() { - @Override public IgniteInternalFuture<GridNearLockResponse> apply( - GridCacheReturn o, Exception e) { - if (e != null) - e = U.unwrap(e); - - assert !t.empty(); - - // Create response while holding locks. - final GridNearLockResponse resp = createLockReply(nearNode, - entries, - req, - t, - t.xidVersion(), - e); - - if (resp.error() == null && t.onePhaseCommit()) { - assert t.implicit(); - - return t.commitAsync().chain( - new C1<IgniteInternalFuture<IgniteInternalTx>, GridNearLockResponse>() { - @Override public GridNearLockResponse apply(IgniteInternalFuture<IgniteInternalTx> f) { - try { - // Check for error. - f.get(); - } - catch (IgniteCheckedException e1) { - resp.error(e1); - } - - sendLockReply(nearNode, t, req, resp); - - return resp; - } - }); - } - else { sendLockReply(nearNode, t, req, resp); - return new GridFinishedFuture<>(resp); + return resp; } - } - } - ); + }); + } + else { + sendLockReply(nearNode, t, req, resp); + + return new GridFinishedFuture<>(resp); + } } - else { - assert fut != null; + } + ); + } + else { + assert fut != null; - // This will send remote messages. - fut.map(); + // This will send remote messages. + fut.map(); - final GridCacheVersion mappedVer = fut.version(); + final GridCacheVersion mappedVer = fut.version(); - return new GridDhtEmbeddedFuture<>( - new C2<Boolean, Exception, GridNearLockResponse>() { - @Override public GridNearLockResponse apply(Boolean b, Exception e) { - if (e != null) - e = U.unwrap(e); - else if (!b) - e = new GridCacheLockTimeoutException(req.version()); + return new GridDhtEmbeddedFuture<>( + new C2<Boolean, Exception, GridNearLockResponse>() { + @Override public GridNearLockResponse apply(Boolean b, Exception e) { + if (e != null) + e = U.unwrap(e); + else if (!b) + e = new GridCacheLockTimeoutException(req.version()); - GridNearLockResponse res = createLockReply(nearNode, - entries, - req, - null, - mappedVer, - e); + GridNearLockResponse res = createLockReply(nearNode, + entries, + req, + null, + mappedVer, + e); - sendLockReply(nearNode, null, req, res); + sendLockReply(nearNode, null, req, res); - return res; - } - }, - fut); + return res; } - } - catch (IgniteCheckedException e) { - String err = "Failed to unmarshal at least one of the keys for lock request message: " + req; - - U.error(log, err, e); + }, + fut); + } + } + catch (IgniteCheckedException | RuntimeException e) { + String err = "Failed to unmarshal at least one of the keys for lock request message: " + req; - if (tx != null) { - try { - tx.rollback(); - } - catch (IgniteCheckedException ex) { - U.error(log, "Failed to rollback the transaction: " + tx, ex); - } - } + U.error(log, err, e); - return new GridDhtFinishedFuture<>( - new IgniteCheckedException(err, e)); - } + if (tx != null) { + try { + tx.rollback(); + } + catch (IgniteCheckedException ex) { + U.error(log, "Failed to rollback the transaction: " + tx, ex); } } - ); + + return new GridDhtFinishedFuture<>( + new IgniteCheckedException(err, e)); + } } /** @@ -1626,4 +1600,4 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach if (nearEntry != null) nearEntry.markObsolete(ctx.versions().next()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index a7ec20f..79bccc2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -219,7 +219,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur @Override public boolean onDone(IgniteInternalTx tx, Throwable err) { if (initialized() || err != null) { if (this.tx.onePhaseCommit() && (this.tx.state() == COMMITTING)) - this.tx.tmCommit(); + this.tx.tmFinish(err == null); Throwable e = this.err.get(); @@ -255,7 +255,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur /** * Initializes future. */ - @SuppressWarnings("SimplifiableIfStatement") + @SuppressWarnings({"SimplifiableIfStatement", "IfMayBeConditional"}) public void finish() { boolean sync; @@ -277,7 +277,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur * @param nodes Nodes. * @return {@code True} in case there is at least one synchronous {@code MiniFuture} to wait for. */ - private boolean rollbackLockTransactions(Set<ClusterNode> nodes) { + private boolean rollbackLockTransactions(Collection<ClusterNode> nodes) { assert !commit; assert !F.isEmpty(nodes); @@ -399,6 +399,8 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur tx.subjectId(), tx.taskNameHash()); + req.writeVersion(tx.writeVersion() != null ? tx.writeVersion() : tx.xidVersion()); + try { cctx.io().send(n, req, tx.ioPolicy()); @@ -450,8 +452,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur tx.subjectId(), tx.taskNameHash()); - if (tx.onePhaseCommit()) - req.writeVersion(tx.writeVersion()); + req.writeVersion(tx.writeVersion()); try { cctx.io().send(nearMapping.node(), req, tx.ioPolicy()); @@ -516,7 +517,7 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur /** * @param node Node. */ - public MiniFuture(ClusterNode node) { + private MiniFuture(ClusterNode node) { this.node = node; } @@ -582,4 +583,4 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCompoundIdentityFutur return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error()); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java index a9cb299..f859314 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java @@ -63,6 +63,9 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { @GridDirectCollection(GridCacheVersion.class) private Collection<GridCacheVersion> pendingVers; + /** Check comitted flag. */ + private boolean checkCommitted; + /** One phase commit write version. */ private GridCacheVersion writeVer; @@ -126,8 +129,21 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { @Nullable UUID subjId, int taskNameHash ) { - super(xidVer, futId, commitVer, threadId, commit, invalidate, sys, plc, syncCommit, syncRollback, baseVer, - committedVers, rolledbackVers, txSize); + super( + xidVer, + futId, + commitVer, + threadId, + commit, + invalidate, + sys, + plc, + syncCommit, + syncRollback, + baseVer, + committedVers, + rolledbackVers, + txSize); assert miniId != null; assert nearNodeId != null; @@ -221,6 +237,20 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { return pendingVers == null ? Collections.<GridCacheVersion>emptyList() : pendingVers; } + /** + * @return Check committed flag. + */ + public boolean checkCommitted() { + return checkCommitted; + } + + /** + * @param checkCommitted Check committed flag. + */ + public void checkCommitted(boolean checkCommitted) { + this.checkCommitted = checkCommitted; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridDhtTxFinishRequest.class, this, super.toString()); @@ -242,54 +272,60 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { switch (writer.state()) { case 18: - if (!writer.writeByte("isolation", isolation != null ? (byte)isolation.ordinal() : -1)) + if (!writer.writeBoolean("checkCommitted", checkCommitted)) return false; writer.incrementState(); case 19: - if (!writer.writeIgniteUuid("miniId", miniId)) + if (!writer.writeByte("isolation", isolation != null ? (byte)isolation.ordinal() : -1)) return false; writer.incrementState(); case 20: - if (!writer.writeUuid("nearNodeId", nearNodeId)) + if (!writer.writeIgniteUuid("miniId", miniId)) return false; writer.incrementState(); case 21: - if (!writer.writeCollection("pendingVers", pendingVers, MessageCollectionItemType.MSG)) + if (!writer.writeUuid("nearNodeId", nearNodeId)) return false; writer.incrementState(); case 22: - if (!writer.writeUuid("subjId", subjId)) + if (!writer.writeCollection("pendingVers", pendingVers, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 23: - if (!writer.writeBoolean("sysInvalidate", sysInvalidate)) + if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); case 24: - if (!writer.writeInt("taskNameHash", taskNameHash)) + if (!writer.writeBoolean("sysInvalidate", sysInvalidate)) return false; writer.incrementState(); case 25: - if (!writer.writeMessage("topVer", topVer)) + if (!writer.writeInt("taskNameHash", taskNameHash)) return false; writer.incrementState(); case 26: + if (!writer.writeMessage("topVer", topVer)) + return false; + + writer.incrementState(); + + case 27: if (!writer.writeMessage("writeVer", writeVer)) return false; @@ -312,6 +348,14 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { switch (reader.state()) { case 18: + checkCommitted = reader.readBoolean("checkCommitted"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 19: byte isolationOrd; isolationOrd = reader.readByte("isolation"); @@ -323,7 +367,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); - case 19: + case 20: miniId = reader.readIgniteUuid("miniId"); if (!reader.isLastRead()) @@ -331,7 +375,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); - case 20: + case 21: nearNodeId = reader.readUuid("nearNodeId"); if (!reader.isLastRead()) @@ -339,7 +383,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); - case 21: + case 22: pendingVers = reader.readCollection("pendingVers", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -347,7 +391,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); - case 22: + case 23: subjId = reader.readUuid("subjId"); if (!reader.isLastRead()) @@ -355,7 +399,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); - case 23: + case 24: sysInvalidate = reader.readBoolean("sysInvalidate"); if (!reader.isLastRead()) @@ -363,7 +407,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); - case 24: + case 25: taskNameHash = reader.readInt("taskNameHash"); if (!reader.isLastRead()) @@ -371,7 +415,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); - case 25: + case 26: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -379,7 +423,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { reader.incrementState(); - case 26: + case 27: writeVer = reader.readMessage("writeVer"); if (!reader.isLastRead()) @@ -399,6 +443,6 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 27; + return 28; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java index d696c05..ec0f234 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishResponse.java @@ -19,6 +19,10 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import java.io.Externalizable; import java.nio.ByteBuffer; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxFinishResponse; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.typedef.internal.S; @@ -36,6 +40,16 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse { /** Mini future ID. */ private IgniteUuid miniId; + /** Error. */ + @GridDirectTransient + private Throwable checkCommittedErr; + + /** Serialized error. */ + private byte[] checkCommittedErrBytes; + + /** Flag indicating if this is a check-committed response. */ + private boolean checkCommitted; + /** * Empty constructor required by {@link Externalizable}. */ @@ -63,6 +77,51 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse { return miniId; } + /** + * @return Error for check committed backup requests. + */ + public Throwable checkCommittedError() { + return checkCommittedErr; + } + + /** + * @param checkCommittedErr Error for check committed backup requests. + */ + public void checkCommittedError(Throwable checkCommittedErr) { + this.checkCommittedErr = checkCommittedErr; + } + + /** + * @return Check committed flag. + */ + public boolean checkCommitted() { + return checkCommitted; + } + + /** + * @param checkCommitted Check committed flag. + */ + public void checkCommitted(boolean checkCommitted) { + this.checkCommitted = checkCommitted; + } + + /** {@inheritDoc} */ + @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { + super.prepareMarshal(ctx); + + if (checkCommittedErr != null) + checkCommittedErrBytes = ctx.marshaller().marshal(checkCommittedErr); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) + throws IgniteCheckedException { + super.finishUnmarshal(ctx, ldr); + + if (checkCommittedErrBytes != null) + checkCommittedErr = ctx.marshaller().unmarshal(checkCommittedErrBytes, ldr); + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridDhtTxFinishResponse.class, this, super.toString()); @@ -84,6 +143,18 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse { switch (writer.state()) { case 5: + if (!writer.writeBoolean("checkCommitted", checkCommitted)) + return false; + + writer.incrementState(); + + case 6: + if (!writer.writeByteArray("checkCommittedErrBytes", checkCommittedErrBytes)) + return false; + + writer.incrementState(); + + case 7: if (!writer.writeIgniteUuid("miniId", miniId)) return false; @@ -106,6 +177,22 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse { switch (reader.state()) { case 5: + checkCommitted = reader.readBoolean("checkCommitted"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 6: + checkCommittedErrBytes = reader.readByteArray("checkCommittedErrBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 7: miniId = reader.readIgniteUuid("miniId"); if (!reader.isLastRead()) @@ -125,6 +212,6 @@ public class GridDhtTxFinishResponse extends GridDistributedTxFinishResponse { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 6; + return 8; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index b23b3e1..4f8469f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -128,6 +128,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa long timeout, boolean invalidate, boolean storeEnabled, + boolean onePhaseCommit, int txSize, Map<UUID, Collection<UUID>> txNodes, UUID subjId, @@ -146,6 +147,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa timeout, invalidate, storeEnabled, + onePhaseCommit, txSize, subjId, taskNameHash); @@ -700,4 +702,4 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa @Override public String toString() { return GridToStringBuilder.toString(GridDhtTxLocal.class, this, "super", super.toString()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index 762d26f..8c7d985 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -98,6 +98,9 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { /** Versions of pending locks for entries of this tx. */ private Collection<GridCacheVersion> pendingVers; + /** Flag indicating that originating node has near cache. */ + private boolean nearOnOriginatingNode; + /** Nodes where transactions were started on lock step. */ private Set<ClusterNode> lockTxNodes; @@ -132,12 +135,28 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { long timeout, boolean invalidate, boolean storeEnabled, + boolean onePhaseCommit, int txSize, @Nullable UUID subjId, int taskNameHash ) { - super(cctx, xidVer, implicit, implicitSingle, sys, plc, concurrency, isolation, timeout, invalidate, - storeEnabled, txSize, subjId, taskNameHash); + super( + cctx, + xidVer, + implicit, + implicitSingle, + sys, + plc, + concurrency, + isolation, + timeout, + invalidate, + storeEnabled, + onePhaseCommit, + txSize, + subjId, + taskNameHash + ); assert cctx != null; @@ -161,6 +180,29 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { } /** + * Sets flag that indicates that originating node has a near cache that participates in this transaction. + * + * @param hasNear Has near cache flag. + */ + public void nearOnOriginatingNode(boolean hasNear) { + nearOnOriginatingNode = hasNear; + } + + /** + * @return {@code True} if explicit lock transaction. + */ + public boolean explicitLock() { + return explicitLock; + } + + /** + * @param explicitLock Explicit lock flag. + */ + public void explicitLock(boolean explicitLock) { + this.explicitLock = explicitLock; + } + + /** * @return Nodes where transactions were started on lock step. */ @Nullable public Set<ClusterNode> lockTransactionNodes() { @@ -229,20 +271,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { } /** - * @return Explicit lock flag. - */ - public boolean explicitLock() { - return explicitLock; - } - - /** - * @param explicitLock Explicit lock flag. - */ - public void explicitLock(boolean explicitLock) { - this.explicitLock = explicitLock; - } - - /** * @return DHT thread ID. */ long dhtThreadId() { @@ -570,7 +598,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { /** * @param cacheCtx Cache context. * @param entries Entries to lock. - * @param onePhaseCommit One phase commit flag. * @param msgId Message ID. * @param read Read flag. * @param accessTtl TTL for read operation. @@ -582,7 +609,6 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { IgniteInternalFuture<GridCacheReturn> lockAllAsync( GridCacheContext cacheCtx, List<GridCacheEntryEx> entries, - boolean onePhaseCommit, long msgId, final boolean read, final boolean needRetVal, @@ -864,13 +890,14 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { * @return {@code True} if transaction is finished on prepare step. */ protected final boolean commitOnPrepare() { - return onePhaseCommit() && !near(); + return onePhaseCommit() && !near() && !nearOnOriginatingNode; } /** * @param prepFut Prepare future. * @return If transaction if finished on prepare step returns future which is completed after transaction finish. */ + @SuppressWarnings("TypeMayBeWeakened") protected final IgniteInternalFuture<GridNearTxPrepareResponse> chainOnePhasePrepare( final GridDhtTxPrepareFuture prepFut) { if (commitOnPrepare()) { @@ -901,4 +928,4 @@ public abstract class GridDhtTxLocalAdapter extends IgniteTxLocalAdapter { return GridToStringBuilder.toString(GridDhtTxLocalAdapter.class, this, "nearNodes", nearMap.keySet(), "dhtNodes", dhtMap.keySet(), "explicitLock", explicitLock, "super", super.toString()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 6e8460f..89fc0ae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -73,6 +73,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteClosure; +import org.apache.ignite.lang.IgniteFutureCancelledException; import org.apache.ignite.lang.IgniteReducer; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; @@ -338,15 +339,13 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter boolean hasFilters = !F.isEmptyOrNulls(txEntry.filters()) && !F.isAlwaysTrue(txEntry.filters()); if (hasFilters || retVal || txEntry.op() == DELETE || txEntry.op() == TRANSFORM) { - CacheObject val; - cached.unswap(retVal); boolean readThrough = (retVal || hasFilters) && cacheCtx.config().isLoadPreviousValue() && !txEntry.skipStore(); - val = cached.innerGet( + CacheObject val = cached.innerGet( tx, /*swap*/true, readThrough, @@ -561,7 +560,8 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter if (tx.optimistic()) tx.clearPrepareFuture(this); - if (tx.onePhaseCommit()) { + // Do not commit one-phase commit transaction if originating node has near cache enabled. + if (tx.onePhaseCommit() && tx.commitOnPrepare()) { assert last; // Must create prepare response before transaction is committed to grab correct return value. @@ -639,8 +639,14 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter * @throws IgniteCheckedException If failed to send response. */ private void sendPrepareResponse(GridNearTxPrepareResponse res) throws IgniteCheckedException { - if (!tx.nearNodeId().equals(cctx.localNodeId())) + if (!tx.nearNodeId().equals(cctx.localNodeId())) { + Throwable err = this.err.get(); + + if (err != null && err instanceof IgniteFutureCancelledException) + return; + cctx.io().send(tx.nearNodeId(), res, tx.ioPolicy()); + } } /** @@ -902,6 +908,7 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter // We are holding transaction-level locks for entries here, so we can get next write version. onEntriesLocked(); + // We are holding transaction-level locks for entries here, so we can get next write version. tx.writeVersion(cctx.versions().next(tx.topologyVersion())); { @@ -978,9 +985,10 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter if (entry.explicitVersion() == null) { GridCacheMvccCandidate added = cached.candidate(version()); - assert added == null || added.dhtLocal() : - "Got non-dht-local candidate for prepare future " + - "[added=" + added + ", entry=" + entry + ']'; + assert added != null : "Null candidate for non-group-lock entry " + + "[added=" + added + ", entry=" + entry + ']'; + assert added.dhtLocal() : "Got non-dht-local candidate for prepare future" + + "[added=" + added + ", entry=" + entry + ']'; if (added != null && added.ownerVersion() != null) req.owned(entry.txKey(), added.ownerVersion()); @@ -1070,8 +1078,10 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter if (entry.explicitVersion() == null) { GridCacheMvccCandidate added = entry.cached().candidate(version()); - assert added == null || added.dhtLocal() : "Got non-dht-local candidate for prepare future" + - "[added=" + added + ", entry=" + entry + ']'; + assert added != null : "Null candidate for non-group-lock entry " + + "[added=" + added + ", entry=" + entry + ']'; + assert added.dhtLocal() : "Got non-dht-local candidate for prepare future" + + "[added=" + added + ", entry=" + entry + ']'; if (added != null && added.ownerVersion() != null) req.owned(entry.txKey(), added.ownerVersion()); @@ -1473,4 +1483,4 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter return S.toString(MiniFuture.class, this, "done", isDone(), "cancelled", isCancelled(), "err", error()); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java index 2ff34a9..f8be2a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxRemote.java @@ -111,8 +111,22 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { @Nullable UUID subjId, int taskNameHash ) { - super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, plc, concurrency, isolation, invalidate, timeout, - txSize, subjId, taskNameHash); + super( + ctx, + nodeId, + rmtThreadId, + xidVer, + commitVer, + sys, + plc, + concurrency, + isolation, + invalidate, + timeout, + txSize, + subjId, + taskNameHash + ); assert nearNodeId != null; assert rmtFutId != null; @@ -168,8 +182,22 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { @Nullable UUID subjId, int taskNameHash ) { - super(ctx, nodeId, rmtThreadId, xidVer, commitVer, sys, plc, concurrency, isolation, invalidate, timeout, - txSize, subjId, taskNameHash); + super( + ctx, + nodeId, + rmtThreadId, + xidVer, + commitVer, + sys, + plc, + concurrency, + isolation, + invalidate, + timeout, + txSize, + subjId, + taskNameHash + ); assert nearNodeId != null; assert rmtFutId != null; @@ -224,7 +252,7 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { /** * @return Near node ID. */ - UUID nearNodeId() { + public UUID nearNodeId() { return nearNodeId; } @@ -334,4 +362,4 @@ public class GridDhtTxRemote extends GridDistributedTxRemoteAdapter { @Override public String toString() { return GridToStringBuilder.toString(GridDhtTxRemote.class, this, "super", super.toString()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 3ddf6d3..0202c53 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -306,7 +306,7 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M ) { if (CU.affinityNodes(cctx, topVer).isEmpty()) { onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + - "(all partition nodes left the grid).")); + "(all partition nodes left the grid) [topVer=" + topVer + ", cache=" + cctx.name() + ']')); return; } @@ -816,4 +816,4 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M return S.toString(MiniFuture.class, this); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 65b1d38..d93f68f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -279,7 +279,7 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> public void map() { AffinityTopologyVersion topVer = null; - IgniteInternalTx tx = cctx.tm().anyActiveThreadTx(); + IgniteInternalTx tx = cctx.tm().anyActiveThreadTx(null); if (tx != null && tx.topologyVersionSnapshot() != null) topVer = tx.topologyVersionSnapshot(); @@ -1188,4 +1188,4 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> public String toString() { return S.toString(GridNearAtomicUpdateFuture.class, this, super.toString()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index f7093b8..596ec77 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -47,6 +47,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLock import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; +import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -547,6 +548,14 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture // Obtain the topology version to use. AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(threadId); + // If there is another system transaction in progress, use it's topology version to prevent deadlock. + if (topVer == null && tx != null && tx.system()) { + IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(tx); + + if (tx0 != null) + topVer = tx0.topologyVersionSnapshot(); + } + if (topVer != null && tx != null) tx.topologyVersion(topVer); @@ -1425,4 +1434,4 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture return S.toString(MiniFuture.class, this, "node", node.id(), "super", super.toString()); } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e1707b68/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index 26276f0..f3e5ca3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -45,6 +45,7 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCa import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTransactionalCacheAdapter; +import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -685,6 +686,14 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean // Obtain the topology version to use. AffinityTopologyVersion topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId()); + // If there is another system transaction in progress, use it's topology version to prevent deadlock. + if (topVer == null && tx != null && tx.system()) { + IgniteInternalTx tx0 = cctx.tm().anyActiveThreadTx(tx); + + if (tx0 != null) + topVer = tx0.topologyVersionSnapshot(); + } + if (topVer != null && tx != null) tx.topologyVersion(topVer); @@ -1566,4 +1575,4 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean return S.toString(MiniFuture.class, this, "node", node.id(), "super", super.toString()); } } -} \ No newline at end of file +}