IGNITE-9082 Throwing checked exception during tx commit without node stopping leads to data corruption - Fixes #4809.
Signed-off-by: Ivan Rakov <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5eb871e1 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5eb871e1 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5eb871e1 Branch: refs/heads/master Commit: 5eb871e191a14fc21f6e2c62bdfa742e27c14695 Parents: 829dc1f Author: Aleksei Scherbakov <[email protected]> Authored: Thu Oct 18 14:52:34 2018 +0300 Committer: Ivan Rakov <[email protected]> Committed: Thu Oct 18 14:52:34 2018 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheMapEntry.java | 5 - .../cache/GridCacheSharedContext.java | 9 + .../GridDistributedTxRemoteAdapter.java | 535 +++++++++---------- .../distributed/dht/GridDhtTxFinishFuture.java | 11 +- .../cache/distributed/dht/GridDhtTxLocal.java | 10 +- .../distributed/dht/GridDhtTxLocalAdapter.java | 3 + .../distributed/dht/GridDhtTxPrepareFuture.java | 59 +- .../near/GridNearTxFinishFuture.java | 38 +- .../cache/distributed/near/GridNearTxLocal.java | 2 +- .../cache/transactions/IgniteTxAdapter.java | 31 ++ .../cache/transactions/IgniteTxHandler.java | 119 ++--- .../transactions/IgniteTxLocalAdapter.java | 519 +++++++++--------- .../processors/failure/FailureProcessor.java | 8 + .../org/apache/ignite/spi/IgniteSpiAdapter.java | 1 - .../cache/GridCacheAbstractSelfTest.java | 9 +- .../cache/query/IndexingSpiQuerySelfTest.java | 66 +-- .../cache/query/IndexingSpiQueryTxSelfTest.java | 74 +-- .../AbstractTransactionIntergrityTest.java | 111 ++-- ...IntegrityWithPrimaryIndexCorruptionTest.java | 268 ++++++---- ...ctionIntegrityWithSystemWorkerDeathTest.java | 6 +- .../TxDataConsistencyOnCommitFailureTest.java | 234 ++++++++ .../junits/common/GridCommonAbstractTest.java | 19 +- .../testsuites/IgniteCacheTestSuite9.java | 3 + 23 files changed, 1220 insertions(+), 920 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 9bb8aec..ab5b725 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -2906,11 +2906,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme ver = newVer; flags &= ~IS_EVICT_DISABLED; - if (cctx.mvccEnabled()) - cctx.offheap().mvccRemoveAll(this); - else - removeValue(); - onInvalidate(); return obsoleteVersionExtras() != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index 52d8525..b5cd82b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -1141,4 +1141,13 @@ public class GridCacheSharedContext<K, V> { public void readOnlyMode(boolean readOnlyMode) { this.readOnlyMode = readOnlyMode; } + + /** + * For test purposes. + * @param txMgr Tx manager. + */ + public void setTxManager(IgniteTxManager txMgr) { + this.txMgr = txMgr; + } + } http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index 7313197..4db4685 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -33,7 +33,6 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.failure.FailureContext; import org.apache.ignite.failure.FailureType; import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.InvalidEnvironmentException; import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.pagemem.wal.WALPointer; import org.apache.ignite.internal.pagemem.wal.record.DataEntry; @@ -55,7 +54,6 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.PartitionUpda import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry; -import org.apache.ignite.internal.processors.cache.persistence.StorageException; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; @@ -512,312 +510,267 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter batchStoreCommit(writeMap().values()); - try { - // Node that for near transactions we grab all entries. - for (IgniteTxEntry txEntry : entries) { - GridCacheContext cacheCtx = txEntry.context(); + // Node that for near transactions we grab all entries. + for (IgniteTxEntry txEntry : entries) { + GridCacheContext cacheCtx = txEntry.context(); - boolean replicate = cacheCtx.isDrEnabled(); + boolean replicate = cacheCtx.isDrEnabled(); + while (true) { try { - while (true) { - try { - GridCacheEntryEx cached = txEntry.cached(); + GridCacheEntryEx cached = txEntry.cached(); - if (cached == null) - txEntry.cached(cached = cacheCtx.cache().entryEx(txEntry.key(), topologyVersion())); + if (cached == null) + txEntry.cached(cached = cacheCtx.cache().entryEx(txEntry.key(), topologyVersion())); - if (near() && cacheCtx.dr().receiveEnabled()) { - cached.markObsolete(xidVer); + if (near() && cacheCtx.dr().receiveEnabled()) { + cached.markObsolete(xidVer); - break; - } + break; + } - GridNearCacheEntry nearCached = null; + GridNearCacheEntry nearCached = null; - if (updateNearCache(cacheCtx, txEntry.key(), topVer)) - nearCached = cacheCtx.dht().near().peekExx(txEntry.key()); + if (updateNearCache(cacheCtx, txEntry.key(), topVer)) + nearCached = cacheCtx.dht().near().peekExx(txEntry.key()); - if (!F.isEmpty(txEntry.entryProcessors())) - txEntry.cached().unswap(false); + if (!F.isEmpty(txEntry.entryProcessors())) + txEntry.cached().unswap(false); - IgniteBiTuple<GridCacheOperation, CacheObject> res = - applyTransformClosures(txEntry, false, ret); + IgniteBiTuple<GridCacheOperation, CacheObject> res = + applyTransformClosures(txEntry, false, ret); - GridCacheOperation op = res.get1(); - CacheObject val = res.get2(); + GridCacheOperation op = res.get1(); + CacheObject val = res.get2(); - GridCacheVersion explicitVer = txEntry.conflictVersion(); + GridCacheVersion explicitVer = txEntry.conflictVersion(); - if (explicitVer == null) - explicitVer = writeVersion(); + if (explicitVer == null) + explicitVer = writeVersion(); - if (txEntry.ttl() == CU.TTL_ZERO) - op = DELETE; + if (txEntry.ttl() == CU.TTL_ZERO) + op = DELETE; - boolean conflictNeedResolve = cacheCtx.conflictNeedResolve(); + boolean conflictNeedResolve = cacheCtx.conflictNeedResolve(); - GridCacheVersionConflictContext conflictCtx = null; + GridCacheVersionConflictContext conflictCtx = null; - if (conflictNeedResolve) { - IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext> - drRes = conflictResolve(op, txEntry, val, explicitVer, cached); + if (conflictNeedResolve) { + IgniteBiTuple<GridCacheOperation, GridCacheVersionConflictContext> + drRes = conflictResolve(op, txEntry, val, explicitVer, cached); - assert drRes != null; + assert drRes != null; - conflictCtx = drRes.get2(); + conflictCtx = drRes.get2(); - if (conflictCtx.isUseOld()) - op = NOOP; - else if (conflictCtx.isUseNew()) { - txEntry.ttl(conflictCtx.ttl()); - txEntry.conflictExpireTime(conflictCtx.expireTime()); - } - else if (conflictCtx.isMerge()) { - op = drRes.get1(); - val = txEntry.context().toCacheObject(conflictCtx.mergeValue()); - explicitVer = writeVersion(); + if (conflictCtx.isUseOld()) + op = NOOP; + else if (conflictCtx.isUseNew()) { + txEntry.ttl(conflictCtx.ttl()); + txEntry.conflictExpireTime(conflictCtx.expireTime()); + } + else if (conflictCtx.isMerge()) { + op = drRes.get1(); + val = txEntry.context().toCacheObject(conflictCtx.mergeValue()); + explicitVer = writeVersion(); - txEntry.ttl(conflictCtx.ttl()); - txEntry.conflictExpireTime(conflictCtx.expireTime()); - } - } - else - // Nullify explicit version so that innerSet/innerRemove will work as usual. - explicitVer = null; - - GridCacheVersion dhtVer = cached.isNear() ? writeVersion() : null; - - if (!near() && cacheCtx.group().persistenceEnabled() && cacheCtx.group().walEnabled() && - op != NOOP && op != RELOAD && (op != READ || cctx.snapshot().needTxReadLogging())) { - if (dataEntries == null) - dataEntries = new ArrayList<>(entries.size()); - - dataEntries.add( - new T2<>( - new DataEntry( - cacheCtx.cacheId(), - txEntry.key(), - val, - op, - nearXidVersion(), - writeVersion(), - 0, - txEntry.key().partition(), - txEntry.updateCounter() - ), - txEntry - ) - ); - } + txEntry.ttl(conflictCtx.ttl()); + txEntry.conflictExpireTime(conflictCtx.expireTime()); + } + } + else + // Nullify explicit version so that innerSet/innerRemove will work as usual. + explicitVer = null; + + GridCacheVersion dhtVer = cached.isNear() ? writeVersion() : null; + + if (!near() && cacheCtx.group().persistenceEnabled() && cacheCtx.group().walEnabled() && + op != NOOP && op != RELOAD && (op != READ || cctx.snapshot().needTxReadLogging())) { + if (dataEntries == null) + dataEntries = new ArrayList<>(entries.size()); + + dataEntries.add( + new T2<>( + new DataEntry( + cacheCtx.cacheId(), + txEntry.key(), + val, + op, + nearXidVersion(), + writeVersion(), + 0, + txEntry.key().partition(), + txEntry.updateCounter() + ), + txEntry + ) + ); + } - if (op == CREATE || op == UPDATE) { - // Invalidate only for near nodes (backups cannot be invalidated). - if (isSystemInvalidate() || (isInvalidate() && cacheCtx.isNear())) - cached.innerRemove(this, - eventNodeId(), - nodeId, - false, - true, - true, - txEntry.keepBinary(), - txEntry.hasOldValue(), - txEntry.oldValue(), - topVer, - null, - replicate ? DR_BACKUP : DR_NONE, - near() ? null : explicitVer, - CU.subjectId(this, cctx), - resolveTaskName(), - dhtVer, - txEntry.updateCounter(), - mvccSnapshot()); - else { - assert val != null : txEntry; - - GridCacheUpdateTxResult updRes = cached.innerSet(this, - eventNodeId(), - nodeId, - val, - false, - false, - txEntry.ttl(), - true, - true, - txEntry.keepBinary(), - txEntry.hasOldValue(), - txEntry.oldValue(), - topVer, - null, - replicate ? DR_BACKUP : DR_NONE, - txEntry.conflictExpireTime(), - near() ? null : explicitVer, - CU.subjectId(this, cctx), - resolveTaskName(), - dhtVer, - txEntry.updateCounter(), - mvccSnapshot()); - - txEntry.updateCounter(updRes.updateCounter()); - - if (updRes.loggedPointer() != null) - ptr = updRes.loggedPointer(); - - // Keep near entry up to date. - if (nearCached != null) { - CacheObject val0 = cached.valueBytes(); - - nearCached.updateOrEvict(xidVer, - val0, - cached.expireTime(), - cached.ttl(), - nodeId, - topVer); - } - } - } - else if (op == DELETE) { - GridCacheUpdateTxResult updRes = cached.innerRemove(this, - eventNodeId(), + if (op == CREATE || op == UPDATE) { + // Invalidate only for near nodes (backups cannot be invalidated). + if (isSystemInvalidate() || (isInvalidate() && cacheCtx.isNear())) + cached.innerRemove(this, + eventNodeId(), + nodeId, + false, + true, + true, + txEntry.keepBinary(), + txEntry.hasOldValue(), + txEntry.oldValue(), + topVer, + null, + replicate ? DR_BACKUP : DR_NONE, + near() ? null : explicitVer, + CU.subjectId(this, cctx), + resolveTaskName(), + dhtVer, + txEntry.updateCounter(), + mvccSnapshot()); + else { + assert val != null : txEntry; + + GridCacheUpdateTxResult updRes = cached.innerSet(this, + eventNodeId(), + nodeId, + val, + false, + false, + txEntry.ttl(), + true, + true, + txEntry.keepBinary(), + txEntry.hasOldValue(), + txEntry.oldValue(), + topVer, + null, + replicate ? DR_BACKUP : DR_NONE, + txEntry.conflictExpireTime(), + near() ? null : explicitVer, + CU.subjectId(this, cctx), + resolveTaskName(), + dhtVer, + txEntry.updateCounter(), + mvccSnapshot()); + + txEntry.updateCounter(updRes.updateCounter()); + + if (updRes.loggedPointer() != null) + ptr = updRes.loggedPointer(); + + // Keep near entry up to date. + if (nearCached != null) { + CacheObject val0 = cached.valueBytes(); + + nearCached.updateOrEvict(xidVer, + val0, + cached.expireTime(), + cached.ttl(), nodeId, - false, - true, - true, - txEntry.keepBinary(), - txEntry.hasOldValue(), - txEntry.oldValue(), - topVer, - null, - replicate ? DR_BACKUP : DR_NONE, - near() ? null : explicitVer, - CU.subjectId(this, cctx), - resolveTaskName(), - dhtVer, - txEntry.updateCounter(), - mvccSnapshot()); - - txEntry.updateCounter(updRes.updateCounter()); - - if (updRes.loggedPointer() != null) - ptr = updRes.loggedPointer(); - - // Keep near entry up to date. - if (nearCached != null) - nearCached.updateOrEvict(xidVer, null, 0, 0, nodeId, topVer); - } - else if (op == RELOAD) { - CacheObject reloaded = cached.innerReload(); - - if (nearCached != null) { - nearCached.innerReload(); - - nearCached.updateOrEvict(cached.version(), - reloaded, - cached.expireTime(), - cached.ttl(), - nodeId, - topVer); - } + topVer); } - else if (op == READ) { - assert near(); - - if (log.isDebugEnabled()) - log.debug("Ignoring READ entry when committing: " + txEntry); - } - // No-op. - else { - if (conflictCtx == null || !conflictCtx.isUseOld()) { - if (txEntry.ttl() != CU.TTL_NOT_CHANGED) - cached.updateTtl(null, txEntry.ttl()); - - if (nearCached != null) { - CacheObject val0 = cached.valueBytes(); - - nearCached.updateOrEvict(xidVer, - val0, - cached.expireTime(), - cached.ttl(), - nodeId, - topVer); - } - } - } - - // Assert after setting values as we want to make sure - // that if we replaced removed entries. - assert - txEntry.op() == READ || onePhaseCommit() || - // If candidate is not there, then lock was explicit - // and we simply allow the commit to proceed. - !cached.hasLockCandidateUnsafe(xidVer) || cached.lockedByUnsafe(xidVer) : - "Transaction does not own lock for commit [entry=" + cached + - ", tx=" + this + ']'; - - // Break out of while loop. - break; - } - catch (GridCacheEntryRemovedException ignored) { - if (log.isDebugEnabled()) - log.debug("Attempting to commit a removed entry (will retry): " + txEntry); - - // Renew cached entry. - txEntry.cached(cacheCtx.cache().entryEx(txEntry.key(), topologyVersion())); } } - } - catch (Throwable ex) { - boolean isNodeStopping = X.hasCause(ex, NodeStoppingException.class); - boolean hasInvalidEnvironmentIssue = X.hasCause(ex, InvalidEnvironmentException.class); - - // In case of error, we still make the best effort to commit, - // as there is no way to rollback at this point. - err = new IgniteTxHeuristicCheckedException("Commit produced a runtime exception " + - "(all transaction entries will be invalidated): " + CU.txString(this), ex); - - if (isNodeStopping) { - U.warn(log, "Failed to commit transaction, node is stopping [tx=" + this + - ", err=" + ex + ']'); - } - else if (hasInvalidEnvironmentIssue) { - U.warn(log, "Failed to commit transaction, node is in invalid state and will be stopped [tx=" + this + - ", err=" + ex + ']'); + else if (op == DELETE) { + GridCacheUpdateTxResult updRes = cached.innerRemove(this, + eventNodeId(), + nodeId, + false, + true, + true, + txEntry.keepBinary(), + txEntry.hasOldValue(), + txEntry.oldValue(), + topVer, + null, + replicate ? DR_BACKUP : DR_NONE, + near() ? null : explicitVer, + CU.subjectId(this, cctx), + resolveTaskName(), + dhtVer, + txEntry.updateCounter(), + mvccSnapshot()); + + txEntry.updateCounter(updRes.updateCounter()); + + if (updRes.loggedPointer() != null) + ptr = updRes.loggedPointer(); + + // Keep near entry up to date. + if (nearCached != null) + nearCached.updateOrEvict(xidVer, null, 0, 0, nodeId, topVer); } - else - U.error(log, "Commit failed.", err); - - state(UNKNOWN); - - if (hasInvalidEnvironmentIssue) - cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, ex)); - else if (!isNodeStopping) { // Skip fair uncommit in case of node stopping or invalidation. - try { - // Courtesy to minimize damage. - uncommit(); + else if (op == RELOAD) { + CacheObject reloaded = cached.innerReload(); + + if (nearCached != null) { + nearCached.innerReload(); + + nearCached.updateOrEvict(cached.version(), + reloaded, + cached.expireTime(), + cached.ttl(), + nodeId, + topVer); } - catch (Throwable ex1) { - U.error(log, "Failed to uncommit transaction: " + this, ex1); + } + else if (op == READ) { + assert near(); - if (ex1 instanceof Error) - throw ex1; + if (log.isDebugEnabled()) + log.debug("Ignoring READ entry when committing: " + txEntry); + } + // No-op. + else { + if (conflictCtx == null || !conflictCtx.isUseOld()) { + if (txEntry.ttl() != CU.TTL_NOT_CHANGED) + cached.updateTtl(null, txEntry.ttl()); + + if (nearCached != null) { + CacheObject val0 = cached.valueBytes(); + + nearCached.updateOrEvict(xidVer, + val0, + cached.expireTime(), + cached.ttl(), + nodeId, + topVer); + } } } - if (ex instanceof Error) - throw (Error) ex; + // Assert after setting values as we want to make sure + // that if we replaced removed entries. + assert + txEntry.op() == READ || onePhaseCommit() || + // If candidate is not there, then lock was explicit + // and we simply allow the commit to proceed. + !cached.hasLockCandidateUnsafe(xidVer) || cached.lockedByUnsafe(xidVer) : + "Transaction does not own lock for commit [entry=" + cached + + ", tx=" + this + ']'; + + // Break out of while loop. + break; + } + catch (GridCacheEntryRemovedException ignored) { + if (log.isDebugEnabled()) + log.debug("Attempting to commit a removed entry (will retry): " + txEntry); - throw err; + // Renew cached entry. + txEntry.cached(cacheCtx.cache().entryEx(txEntry.key(), topologyVersion())); } } + } - // Apply cache size deltas. - applyTxSizes(); + // Apply cache size deltas. + applyTxSizes(); - TxCounters txCntrs = txCounters(false); + TxCounters txCntrs = txCounters(false); - // Apply update counters. - if (txCntrs != null) - applyPartitionsUpdatesCounters(txCntrs.updateCounters()); + // Apply update counters. + if (txCntrs != null) + applyPartitionsUpdatesCounters(txCntrs.updateCounters()); cctx.mvccCaching().onTxFinished(this, true); @@ -827,18 +780,32 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter .map(tuple -> tuple.get1().partitionCounter(tuple.get2().updateCounter())) .collect(Collectors.toList()); - cctx.wal().log(new DataRecord(entriesWithCounters)); - } + cctx.wal().log(new DataRecord(entriesWithCounters)); + } + + if (ptr != null && !cctx.tm().logTxRecords()) + cctx.wal().flush(ptr, false); + } + catch (Throwable ex) { + state(UNKNOWN); - if (ptr != null && !cctx.tm().logTxRecords()) - cctx.wal().flush(ptr, false); + if (X.hasCause(ex, NodeStoppingException.class)) { + U.warn(log, "Failed to commit transaction, node is stopping [tx=" + CU.txString(this) + + ", err=" + ex + ']'); + + return; } - catch (StorageException e) { - err = e; - throw new IgniteCheckedException("Failed to log transaction record " + - "(transaction will be rolled back): " + this, e); + err = heuristicException(ex); + + try { + uncommit(); + } + catch (Throwable e) { + err.addSuppressed(e); } + + throw err; } finally { cctx.database().checkpointReadUnlock(); @@ -878,9 +845,19 @@ public abstract class GridDistributedTxRemoteAdapter extends IgniteTxAdapter throw new IgniteCheckedException("Invalid transaction state for commit [state=" + state + ", tx=" + this + ']'); rollbackRemoteTx(); + + return; } - commitIfLocked(); + try { + commitIfLocked(); + } + catch (IgniteTxHeuristicCheckedException e) { + // Treat heuristic exception as critical. + cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); + + throw e; + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java index 21eb7b2..9f96b46 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishFuture.java @@ -173,10 +173,13 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity if (ERR_UPD.compareAndSet(this, null, e)) { tx.setRollbackOnly(); - if (X.hasCause(e, InvalidEnvironmentException.class, NodeStoppingException.class)) + if (X.hasCause(e, NodeStoppingException.class) || cctx.kernalContext().failure().nodeStopping()) onComplete(); - else + else { + // Rolling back a remote transaction may result in partial commit. + // This is only acceptable in tests with no-op failure handler. finish(false); + } } } @@ -230,9 +233,9 @@ public final class GridDhtTxFinishFuture<K, V> extends GridCacheCompoundIdentity if (this.tx.onePhaseCommit() && (this.tx.state() == COMMITTING)) { try { - boolean hasInvalidEnvironmentIssue = X.hasCause(err, InvalidEnvironmentException.class, NodeStoppingException.class); + boolean nodeStopping = X.hasCause(err, NodeStoppingException.class); - this.tx.tmFinish(err == null, hasInvalidEnvironmentIssue, false); + this.tx.tmFinish(err == null, nodeStopping || cctx.kernalContext().failure().nodeStopping(), false); } catch (IgniteCheckedException finishErr) { U.error(log, "Failed to finish tx: " + tx, e); http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index a091d44..ca451f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -24,6 +24,8 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.failure.FailureContext; +import org.apache.ignite.failure.FailureType; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -39,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPr import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException; import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; @@ -46,6 +49,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringBuilder; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; @@ -467,7 +471,11 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa ", tx=" + CU.txString(this) + ']'); } catch (IgniteCheckedException e) { - U.error(log, "Failed to finish transaction [commit=" + commit + ", tx=" + this + ']', e); + logTxFinishErrorSafe(log, commit, e); + + // Treat heuristic exception as critical. + if (X.hasCause(e, IgniteTxHeuristicCheckedException.class)) + cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); err = e; } http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java index ffa383b..483990f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocalAdapter.java @@ -30,6 +30,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.failure.FailureContext; +import org.apache.ignite.failure.FailureType; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; @@ -43,6 +45,7 @@ import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTx import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxAdapter; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter; import org.apache.ignite.internal.processors.cache.transactions.TxCounters; http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 741faee..c505677 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -36,6 +36,8 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteInterruptedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.failure.FailureContext; +import org.apache.ignite.failure.FailureType; import org.apache.ignite.internal.IgniteDiagnosticAware; import org.apache.ignite.internal.IgniteDiagnosticPrepareContext; import org.apache.ignite.internal.IgniteInternalFuture; @@ -740,8 +742,6 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite if (tx.commitOnPrepare()) { if (tx.markFinalizing(IgniteInternalTx.FinalizationStatus.USER_FINISH)) { - IgniteInternalFuture<IgniteInternalTx> fut = null; - CIX1<IgniteInternalFuture<IgniteInternalTx>> resClo = new CIX1<IgniteInternalFuture<IgniteInternalTx>>() { @Override public void applyx(IgniteInternalFuture<IgniteInternalTx> fut) { @@ -753,42 +753,43 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite } }; - if (prepErr == null) { - try { - fut = tx.commitAsync(); - } - catch (RuntimeException | Error e) { - Exception hEx = new IgniteTxHeuristicCheckedException("Commit produced a runtime " + - "exception: " + CU.txString(tx), e); - - res.error(hEx); + try { + if (prepErr == null) { + try { + tx.commitAsync().listen(resClo); + } + catch (Throwable e) { + res.error(e); - tx.systemInvalidate(true); + tx.systemInvalidate(true); - try { - fut = tx.rollbackAsync(); + try { + tx.rollbackAsync().listen(resClo); + } + catch (Throwable e1) { + e.addSuppressed(e1); + } - fut.listen(resClo); + throw e; } - catch (Throwable e1) { - e.addSuppressed(e1); + } + else if (!cctx.kernalContext().isStopping()) { + try { + tx.rollbackAsync().listen(resClo); } + catch (Throwable e) { + if (err != null) + err.addSuppressed(e); - throw e; + throw err; + } } - } - else if (!cctx.kernalContext().isStopping()) - try { - fut = tx.rollbackAsync(); - } - catch (Throwable e) { - err.addSuppressed(e); - fut = null; - } + catch (Throwable e){ + tx.logTxFinishErrorSafe(log, true, e); - if (fut != null) - fut.listen(resClo); + cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); + } } } else { http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java index 4a4d8e3..befa305 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxFinishFuture.java @@ -311,7 +311,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit if (err != null) { tx.setRollbackOnly(); - nodeStop = err instanceof NodeStoppingException; + nodeStop = err instanceof NodeStoppingException || cctx.kernalContext().failure().nodeStopping(); } if (commit) { @@ -357,29 +357,6 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit } if (super.onDone(tx0, err)) { - if (error() instanceof IgniteTxHeuristicCheckedException && !nodeStop) { - AffinityTopologyVersion topVer = tx.topologyVersion(); - - for (IgniteTxEntry e : tx.writeMap().values()) { - GridCacheContext cacheCtx = e.context(); - - try { - if (e.op() != NOOP && !cacheCtx.affinity().keyLocalNode(e.key(), topVer)) { - GridCacheEntryEx entry = cacheCtx.cache().peekEx(e.key()); - - if (entry != null) - entry.invalidate(tx.xidVersion()); - } - } - catch (Throwable t) { - U.error(log, "Failed to invalidate entry.", t); - - if (t instanceof Error) - throw (Error)t; - } - } - } - // Don't forget to clean up. cctx.mvcc().removeFuture(futId); @@ -402,8 +379,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit } /** {@inheritDoc} */ - @Override @SuppressWarnings("ForLoopReplaceableByForEach") - public void finish(final boolean commit, final boolean clearThreadMap, final boolean onTimeout) { + @Override public void finish(final boolean commit, final boolean clearThreadMap, final boolean onTimeout) { if (!cctx.mvcc().addFuture(this, futureId())) return; @@ -490,18 +466,22 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit } } + // Cleanup transaction if heuristic failure. + if (tx.state() == UNKNOWN) + cctx.tm().rollbackTx(tx, clearThreadMap, false); + if ((tx.onePhaseCommit() && needFinishOnePhase(commit)) || (!tx.onePhaseCommit() && mappings != null)) { if (mappings.single()) { GridDistributedTxMapping mapping = mappings.singleMapping(); if (mapping != null) { - assert !hasFutures() || waitTxs != null : futures(); + assert !hasFutures() || isDone() || waitTxs != null : futures(); finish(1, mapping, commit, !clearThreadMap); } } else { - assert !hasFutures() || waitTxs != null : futures(); + assert !hasFutures() || isDone() || waitTxs != null : futures(); finish(mappings.mappings(), commit, !clearThreadMap); } @@ -762,7 +742,7 @@ public final class GridNearTxFinishFuture<K, V> extends GridCacheCompoundIdentit /** * @param mappings Mappings. * @param commit Commit flag. - * @param {@code true} If need to add completed version on finish. + * @param useCompletedVer {@code True} if need to add completed version on finish. */ private void finish(Iterable<GridDistributedTxMapping> mappings, boolean commit, boolean useCompletedVer) { int miniId = 0; http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 76d464e..f56d99b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -3955,7 +3955,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou assert rollbackFut.isDone() : rollbackFut; } - else + else // First finish attempt was unsuccessful. Try again. rollbackFut.finish(false, clearThreadMap, onTimeout); } else { http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index b091061..0d3ba75 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -75,6 +75,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx; import org.apache.ignite.internal.processors.cluster.BaselineTopology; +import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; import org.apache.ignite.internal.util.GridSetWrapper; @@ -764,6 +765,36 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement "[timeout=" + timeout() + ", tx=" + CU.txString(this) + ']'); } + /** + * @param ex Root cause. + */ + public final IgniteCheckedException heuristicException(Throwable ex) { + return new IgniteTxHeuristicCheckedException("Committing a transaction has produced runtime exception", ex); + } + + /** + * @param log Log. + * @param commit Commit. + * @param e Exception. + */ + public void logTxFinishErrorSafe(@Nullable IgniteLogger log, boolean commit, Throwable e) { + assert e != null : "Exception is expected"; + + final String fmt = "Failed completing the transaction: [commit=%s, tx=%s, plc=%s]"; + + try { + // First try printing a full transaction. This is error prone. + U.error(log, String.format(fmt, commit, this, + cctx.gridConfig().getFailureHandler().getClass().getSimpleName()), e); + } + catch (Throwable e0) { + e.addSuppressed(e0); + + U.error(log, String.format(fmt, commit, CU.txString(this), + cctx.gridConfig().getFailureHandler().getClass().getSimpleName()), e); + } + } + /** {@inheritDoc} */ @Override public GridCacheVersion xidVersion() { return xidVer; http://git-wip-us.apache.org/repos/asf/ignite/blob/5eb871e1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java index 4c7b65d..895a9d1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxHandler.java @@ -1030,45 +1030,34 @@ public class IgniteTxHandler { } catch (Throwable e) { try { - U.error(log, "Failed completing transaction [commit=" + req.commit() + ", tx=" + tx + ']', e); - } - catch (Throwable e0) { - ClusterNode node0 = ctx.discovery().node(nodeId); - - U.error(log, "Failed completing transaction [commit=" + req.commit() + ", tx=" + - CU.txString(tx) + ']', e); - - U.error(log, "Failed to log message due to an error: ", e0); + if (tx != null) { + tx.commitError(e); - if (node0 != null && (!node0.isClient() || node0.isLocal())) { - ctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); + tx.systemInvalidate(true); - throw e; - } - } - - if (tx != null) { - tx.commitError(e); - - tx.systemInvalidate(true); + try { + IgniteInternalFuture<IgniteInternalTx> res = tx.rollbackDhtLocalAsync(); - try { - IgniteInternalFuture<IgniteInternalTx> res = tx.rollbackDhtLocalAsync(); + // Only for error logging. + res.listen(CU.errorLogger(log)); - // Only for error logging. - res.listen(CU.errorLogger(log)); + return res; + } + catch (Throwable e1) { + e.addSuppressed(e1); + } - return res; + tx.logTxFinishErrorSafe(log, req.commit(), e); } - catch (Throwable e1) { - e.addSuppressed(e1); - } - } - if (e instanceof Error) - throw (Error)e; + if (e instanceof Error) + throw (Error)e; - return new GridFinishedFuture<>(e); + return new GridFinishedFuture<>(e); + } + finally { + ctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); + } } } @@ -1093,20 +1082,26 @@ public class IgniteTxHandler { return tx.rollbackAsyncLocal(); } catch (Throwable e) { - U.error(log, "Failed completing transaction [commit=" + commit + ", tx=" + tx + ']', e); - - if (e instanceof Error) - throw e; + try { + if (tx != null) { + try { + return tx.rollbackNearTxLocalAsync(); + } + catch (Throwable e1) { + e.addSuppressed(e1); + } - if (tx != null) - try { - return tx.rollbackNearTxLocalAsync(); - } - catch (Throwable e1) { - e.addSuppressed(e1); + tx.logTxFinishErrorSafe(log, commit, e); } - return new GridFinishedFuture<>(e); + if (e instanceof Error) + throw e; + + return new GridFinishedFuture<>(e); + } + finally { + ctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); + } } } @@ -1193,10 +1188,6 @@ public class IgniteTxHandler { if (log.isDebugEnabled()) log.debug("Optimistic failure for remote transaction (will rollback): " + req); } - else if (e instanceof IgniteTxHeuristicCheckedException) { - U.warn(log, "Failed to commit transaction (all transaction entries were invalidated): " + - CU.txString(dhtTx)); - } else U.error(log, "Failed to process prepare request: " + req, e); @@ -1421,9 +1412,10 @@ public class IgniteTxHandler { tx.rollbackRemoteTx(); } } + catch (IgniteTxHeuristicCheckedException e) { + // Already uncommitted. + } catch (Throwable e) { - U.error(log, "Failed completing transaction [commit=" + req.commit() + ", tx=" + tx + ']', e); - // Mark transaction for invalidate. tx.invalidate(true); tx.systemInvalidate(true); @@ -1441,6 +1433,8 @@ public class IgniteTxHandler { } /** + * Finish for one-phase distributed tx. + * * @param tx Transaction. * @param req Request. */ @@ -1464,22 +1458,27 @@ public class IgniteTxHandler { throw e; } catch (Throwable e) { - U.error(log, "Failed committing transaction [tx=" + tx + ']', e); + try { + // Mark transaction for invalidate. + tx.invalidate(true); - // Mark transaction for invalidate. - tx.invalidate(true); - tx.systemInvalidate(true); + tx.systemInvalidate(true); - try { - tx.rollbackRemoteTx(); + try { + tx.rollbackRemoteTx(); + } + catch (Throwable e1) { + e.addSuppressed(e1); + } + + tx.logTxFinishErrorSafe(log, true, e); + + if (e instanceof Error) + throw (Error)e; } - catch (Throwable e1) { - e.addSuppressed(e1); - U.error(log, "Failed to automatically rollback transaction: " + tx, e1); + finally { + ctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); } - - if (e instanceof Error) - throw (Error)e; } }
