IGNITE-6181 wip.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6a928460 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6a928460 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6a928460 Branch: refs/heads/ignite-6181-1 Commit: 6a928460877ba42d965cb7ec38e7962f338eacbe Parents: d5f3a67 Author: Aleksei Scherbakov <[email protected]> Authored: Fri Sep 1 19:17:31 2017 +0300 Committer: Aleksei Scherbakov <[email protected]> Committed: Fri Sep 1 19:17:31 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 6 ++++++ .../cache/distributed/near/GridNearTxLocal.java | 21 ++++++++++---------- .../cache/transactions/IgniteInternalTx.java | 2 +- .../transactions/IgniteTransactionsImpl.java | 4 +++- .../cache/transactions/IgniteTxAdapter.java | 4 +++- .../transactions/IgniteTxLocalAdapter.java | 3 --- .../cache/transactions/IgniteTxManager.java | 20 +++++++++++++++++-- .../cache/IgniteTxConfigCacheSelfTest.java | 12 +++++++++++ .../transactions/TxRollbackOnTimeoutTest.java | 2 +- 9 files changed, 54 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6a928460/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index fed716c..2be738a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -102,6 +102,7 @@ import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFi import org.apache.ignite.internal.processors.task.GridInternal; import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; +import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -138,6 +139,7 @@ import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode; import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; +import org.apache.ignite.transactions.TransactionState; import org.jetbrains.annotations.Nullable; import org.jsr166.LongAdder8; @@ -4032,6 +4034,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V GridNearTxLocal tx = ctx.tm().threadLocalTx(ctx); + if (tx != null && tx.state() == TransactionState.ROLLED_BACK && tx.timedOut()) + throw new IgniteTxTimeoutCheckedException("Previous transaction was rolled back by timeout. " + + "Please start new transaction and retry an operation."); + if (tx == null || tx.implicit()) { TransactionConfiguration tCfg = CU.transactionConfiguration(ctx, ctx.kernalContext().config()); http://git-wip-us.apache.org/repos/asf/ignite/blob/6a928460/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 84c9790..f5ecdd8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -109,6 +109,7 @@ import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxE import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry.SER_READ_NOT_EMPTY_VER; import static org.apache.ignite.transactions.TransactionState.COMMITTED; import static org.apache.ignite.transactions.TransactionState.COMMITTING; +import static org.apache.ignite.transactions.TransactionState.MARKED_ROLLBACK; import static org.apache.ignite.transactions.TransactionState.PREPARED; import static org.apache.ignite.transactions.TransactionState.PREPARING; import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK; @@ -182,9 +183,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou private TransactionProxyImpl proxy; /** */ - private long endTime; - - /** */ private transient Exception trackE; /** @@ -246,8 +244,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou if (TRACK_TRANSACTION_INITIATOR) trackE = new Exception(); - endTime = U.currentTimeMillis() + this.timeout; - cctx.time().addTimeoutObject(this); } @@ -3169,7 +3165,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou // Prepare was called explicitly. return fut; - if (endTime > 0) + if (timeout() > 0) cctx.time().removeTimeoutObject(this); mapExplicitLocks(); @@ -3274,7 +3270,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou * @throws IgniteCheckedException If failed. */ public void rollback() throws IgniteCheckedException { - if (endTime > 0) + if (timeout() > 0) cctx.time().removeTimeoutObject(this); rollbackNearTxLocalAsync().get(); @@ -3719,6 +3715,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou if (state != ROLLING_BACK && state != ROLLED_BACK && state != COMMITTING && state != COMMITTED) rollback(); + cctx.tm().onLocalClose(threadId(), this); + synchronized (this) { try { while (!done()) @@ -4080,15 +4078,16 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou /** {@inheritDoc} */ @Override public long endTime() { - return endTime; + return startTime() + timeout(); } /** {@inheritDoc} */ @Override public void onTimeout() { - log.error("Transaction is timed out and will be rolled back: [tx=" + this + ']', trackE); + if (state(MARKED_ROLLBACK, true)) { + log.error("Transaction is timed out and will be rolled back: [" + this + ']', trackE); - if (setRollbackOnly()) - rollbackAsync(); + rollbackNearTxLocalAsync(); + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/6a928460/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java index 7598003..9e06d9d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteInternalTx.java @@ -634,4 +634,4 @@ public interface IgniteInternalTx { * @param e Commit error. */ public void commitError(Throwable e); -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/6a928460/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java index 12f655a..9891484 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java @@ -29,6 +29,7 @@ import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; import org.apache.ignite.transactions.TransactionMetrics; import org.apache.ignite.transactions.TransactionException; +import org.apache.ignite.transactions.TransactionState; import org.jetbrains.annotations.Nullable; /** @@ -154,7 +155,8 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx { try { GridNearTxLocal tx = cctx.tm().userTx(sysCacheCtx); - if (tx != null) + // Allow to start new transaction if previous transaction was rolled back by timeout. + if (tx != null && !(tx.state() == TransactionState.ROLLED_BACK && tx.timedOut())) throw new IllegalStateException("Failed to start new transaction " + "(current thread already has a transaction): " + tx); http://git-wip-us.apache.org/repos/asf/ignite/blob/6a928460/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java index 65f8af2..7d4464a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java @@ -1047,7 +1047,9 @@ public abstract class IgniteTxAdapter extends GridMetadataAwareAdapter implement if (valid) { this.state = state; - this.timedOut = timedOut; + + if (timedOut) + this.timedOut = true; if (log.isDebugEnabled()) log.debug("Changed transaction state [prev=" + prev + ", new=" + this.state + ", tx=" + this + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/6a928460/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index e7ebaae..ea105ae 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -1237,9 +1237,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter implements Ig * @throws IgniteCheckedException If transaction check failed. */ protected void checkValid() throws IgniteCheckedException { - if (local() && !dht() && remainingTime() == -1) - state(MARKED_ROLLBACK, true); - if (isRollbackOnly()) { if (remainingTime() == -1) throw new IgniteTxTimeoutCheckedException("Cache transaction timed out: " + this); http://git-wip-us.apache.org/repos/asf/ignite/blob/6a928460/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index a3bf205..ee7d922 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -31,6 +31,7 @@ import java.util.UUID; import java.util.concurrent.ConcurrentMap; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteClientDisconnectedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.binary.BinaryObjectException; import org.apache.ignite.cluster.ClusterNode; @@ -1423,8 +1424,14 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { */ private void clearThreadMap(IgniteInternalTx tx) { if (tx.local() && !tx.dht()) { - if (!tx.system()) - threadMap.remove(tx.threadId(), tx); + if (!tx.system()) { + /** + * Timed out local transactions are cleared in {@link #onLocalClose} + * TODO reduce heap size of timed out transaction. + */ + if (!(tx.timedOut() && tx.pessimistic() && tx.local() && !tx.dht())) + threadMap.remove(tx.threadId(), tx); + } else { Integer cacheId = tx.txState().firstCacheId(); @@ -2316,6 +2323,15 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } /** + * Remove thread completely. + * @param threadId Thread ID. + * @param tx Local transaction. + */ + public void onLocalClose(long threadId, GridNearTxLocal tx) { + threadMap.remove(threadId, tx); + } + + /** * Timeout object for node failure handler. */ private final class NodeFailureTimeoutObject extends GridTimeoutObjectAdapter { http://git-wip-us.apache.org/repos/asf/ignite/blob/6a928460/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java index 2efa0cb..8574f0c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java @@ -185,6 +185,8 @@ public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest { try (final Transaction tx = ignite.transactions().txStart()) { assert tx != null; + cache.put("key0", "val0"); + sleepForTxFailure(); cache.put("key", "val"); @@ -195,7 +197,17 @@ public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest { assert e.getCause() instanceof TransactionTimeoutException; } + assert !cache.containsKey("key0"); assert !cache.containsKey("key"); + + // New transaction must succeed. + try (final Transaction tx = ignite.transactions().txStart()) { + cache.put("key", "val"); + + tx.commit(); + } + + assert cache.containsKey("key"); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/6a928460/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java index 9243f58..bb1bbc7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java @@ -266,7 +266,7 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest { tx.commit(); } } - }, 2, "Second"); + }, 1, "Second"); fut2.get();
