IGNITE-10277 Fixed prepare-commit WAL ordering for one-phase commit - Fixes #5448.
Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/395811dd Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/395811dd Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/395811dd Branch: refs/heads/ignite-10044 Commit: 395811ddc11ed9d8bfb8972b110a4b7f0555f294 Parents: 493e0aa Author: Anton Kalashnikov <kaa....@yandex.ru> Authored: Mon Dec 3 11:02:51 2018 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Mon Dec 3 11:02:51 2018 +0300 ---------------------------------------------------------------------- .../dht/GridDhtTransactionalCacheAdapter.java | 4 ++- .../cache/distributed/dht/GridDhtTxLocal.java | 7 ++++- .../distributed/dht/GridDhtTxPrepareFuture.java | 19 +++++++++++--- ...arOptimisticSerializableTxPrepareFuture.java | 11 +++++--- .../near/GridNearOptimisticTxPrepareFuture.java | 5 ++-- .../GridNearPessimisticTxPrepareFuture.java | 5 ++-- .../cache/transactions/IgniteTxAdapter.java | 27 ++++++++++++++++++-- .../cache/transactions/IgniteTxHandler.java | 24 ++++++++++++++--- 8 files changed, 84 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/395811dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index 4693232..dcd7be5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -1122,7 +1122,8 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach null, req.subjectId(), req.taskNameHash(), - req.txLabel()); + req.txLabel(), + null); if (req.syncCommit()) tx.syncMode(FULL_SYNC); @@ -2158,6 +2159,7 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach null, txSubjectId, txTaskNameHash, + null, null); // if (req.syncCommit()) http://git-wip-us.apache.org/repos/asf/ignite/blob/395811dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index 6bad093..53c4942 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -36,6 +36,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.GridCacheMappedVersion; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishResponse; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; @@ -122,6 +123,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa * @param txSize Expected transaction size. * @param txNodes Transaction nodes mapping. * @param lb Transaction label. + * @param parentTx Transaction from which this transaction was copied by(if it was). */ public GridDhtTxLocal( GridCacheSharedContext cctx, @@ -146,7 +148,8 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa Map<UUID, Collection<UUID>> txNodes, UUID subjId, int taskNameHash, - @Nullable String lb + @Nullable String lb, + GridNearTxLocal parentTx ) { super( cctx, @@ -180,6 +183,8 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa threadId = nearThreadId; + setParentTx(parentTx); + assert !F.eq(xidVer, nearXidVer); initResult(); http://git-wip-us.apache.org/repos/asf/ignite/blob/395811dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 92e0ce8..58edc9d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -1003,7 +1003,7 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite * @return {@code True} if {@code done} flag was changed as a result of this call. */ private boolean onComplete(@Nullable GridNearTxPrepareResponse res) { - if ((last || tx.isSystemInvalidate()) && !(tx.near() && tx.local())) + if (!tx.onePhaseCommit() && ((last || tx.isSystemInvalidate()) && !(tx.near() && tx.local()))) tx.state(PREPARED); if (super.onDone(res, res == null ? err : null)) { @@ -1311,8 +1311,14 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite if (isDone()) return; - if (last) + if (last) { + recheckOnePhaseCommit(); + + if (tx.onePhaseCommit()) + tx.chainState(PREPARED); + sendPrepareRequests(); + } } finally { markInitialized(); @@ -1320,9 +1326,9 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite } /** - * + * Checking that one phase commit for transaction still actual. */ - private void sendPrepareRequests() { + private void recheckOnePhaseCommit() { if (tx.onePhaseCommit() && !tx.nearMap().isEmpty()) { for (GridDistributedTxMapping nearMapping : tx.nearMap().values()) { if (!tx.dhtMap().containsKey(nearMapping.primary().id())) { @@ -1332,7 +1338,12 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite } } } + } + /** + * + */ + private void sendPrepareRequests() { assert !tx.txState().mvccEnabled() || !tx.onePhaseCommit() || tx.mvccSnapshot() != null; int miniId = 0; http://git-wip-us.apache.org/repos/asf/ignite/blob/395811dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java index 2280619..46bab86 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@ -269,7 +269,8 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim private boolean onComplete() { Throwable err0 = err; - if (err0 == null || tx.needCheckBackup()) + if ((!tx.onePhaseCommit() || tx.mappings().get(cctx.localNodeId()) == null) && + (err0 == null || tx.needCheckBackup())) tx.state(PREPARED); if (super.onDone(tx, err0)) { @@ -350,9 +351,13 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim hasNearCache = true; } - for (IgniteTxEntry read : reads) + for (IgniteTxEntry read : reads) { map(read, topVer, mappings, txMapping, remap, topLocked); + if (read.context().isNear()) + hasNearCache = true; + } + if (keyLockFut != null) keyLockFut.onAllKeysAdded(); @@ -575,7 +580,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim final MiniFuture fut, final boolean nearEntries) { IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = nearEntries ? - cctx.tm().txHandler().prepareNearTxLocal(req) : + cctx.tm().txHandler().prepareNearTxLocal(tx, req) : cctx.tm().txHandler().prepareColocatedTx(tx, req); prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() { http://git-wip-us.apache.org/repos/asf/ignite/blob/395811dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index d3639c9..140f593 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -297,7 +297,8 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa private boolean onComplete() { Throwable err0 = err; - if (err0 == null || tx.needCheckBackup()) + if ((!tx.onePhaseCommit() || tx.mappings().get(cctx.localNodeId()) == null) && + (err0 == null || tx.needCheckBackup())) tx.state(PREPARED); if (super.onDone(tx, err0)) { @@ -567,7 +568,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa assert !(m.hasColocatedCacheEntries() && m.hasNearCacheEntries()) : m; IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = - m.hasNearCacheEntries() ? cctx.tm().txHandler().prepareNearTxLocal(req) + m.hasNearCacheEntries() ? cctx.tm().txHandler().prepareNearTxLocal(tx, req) : cctx.tm().txHandler().prepareColocatedTx(tx, req); prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() { http://git-wip-us.apache.org/repos/asf/ignite/blob/395811dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java index a480a99..d2e1586 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java @@ -254,7 +254,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA add((IgniteInternalFuture)fut); IgniteInternalFuture<GridNearTxPrepareResponse> prepFut = nearEntries ? - cctx.tm().txHandler().prepareNearTxLocal(req) : + cctx.tm().txHandler().prepareNearTxLocal(tx, req) : cctx.tm().txHandler().prepareColocatedTx(tx, req); prepFut.listen(new CI1<IgniteInternalFuture<GridNearTxPrepareResponse>>() { @@ -438,7 +438,8 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA err = this.err; - if (err == null || tx.needCheckBackup()) + if ((!tx.onePhaseCommit() || tx.mappings().get(cctx.localNodeId()) == null) && + (err == null || tx.needCheckBackup())) tx.state(PREPARED); if (super.onDone(tx, err)) { http://git-wip-us.apache.org/repos/asf/ignite/blob/395811dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index ec1b646..c8b0459 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.processors.cache.transactions; +import javax.cache.expiry.ExpiryPolicy; +import javax.cache.processor.EntryProcessor; import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; @@ -37,8 +39,6 @@ import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import javax.cache.expiry.ExpiryPolicy; -import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; @@ -287,6 +287,9 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement @GridToStringExclude private volatile TxCounters txCounters; + /** Transaction from which this transaction was copied by(if it was). */ + private GridNearTxLocal parentTx; + /** * Empty constructor required for {@link Externalizable}. */ @@ -406,6 +409,13 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement } /** + * @param parentTx Transaction from which this transaction was copied by. + */ + public void setParentTx(GridNearTxLocal parentTx) { + this.parentTx = parentTx; + } + + /** * @return Mvcc info. */ @Override @Nullable public MvccSnapshot mvccSnapshot() { @@ -1044,6 +1054,19 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement return state(state, false); } + /** + * Changing state for this transaction as well as chained(parent) transactions. + * + * @param state Transaction state. + * @return {@code True} if transition was valid, {@code false} otherwise. + */ + public boolean chainState(TransactionState state) { + if (parentTx != null) + parentTx.state(state); + + return state(state); + } + /** {@inheritDoc} */ @Override public IgniteInternalFuture<IgniteInternalTx> finishFuture() { GridFutureAdapter<IgniteInternalTx> fut = finFut; http://git-wip-us.apache.org/repos/asf/ignite/blob/395811dd/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index aefd54c..319073b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -342,14 +342,17 @@ public class IgniteTxHandler { } /** + * @param originTx Transaction for copy. * @param req Request. * @return Prepare future. */ - public IgniteInternalFuture<GridNearTxPrepareResponse> prepareNearTxLocal(final GridNearTxPrepareRequest req) { + public IgniteInternalFuture<GridNearTxPrepareResponse> prepareNearTxLocal( + final GridNearTxLocal originTx, + final GridNearTxPrepareRequest req) { // Make sure not to provide Near entries to DHT cache. req.cloneEntries(); - return prepareNearTx(ctx.localNode(), req); + return prepareNearTx(originTx, ctx.localNode(), req); } /** @@ -361,6 +364,20 @@ public class IgniteTxHandler { final ClusterNode nearNode, final GridNearTxPrepareRequest req ) { + return prepareNearTx(null, nearNode, req); + } + + /** + * @param originTx Transaction for copy. + * @param nearNode Node that initiated transaction. + * @param req Near prepare request. + * @return Prepare future or {@code null} if need retry operation. + */ + @Nullable private IgniteInternalFuture<GridNearTxPrepareResponse> prepareNearTx( + final GridNearTxLocal originTx, + final ClusterNode nearNode, + final GridNearTxPrepareRequest req + ) { IgniteTxEntry firstEntry; try { @@ -509,7 +526,8 @@ public class IgniteTxHandler { req.transactionNodes(), req.subjectId(), req.taskNameHash(), - req.txLabel() + req.txLabel(), + originTx ); tx = ctx.tm().onCreated(null, tx);