ignite-6181-1
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f950cb15 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f950cb15 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f950cb15 Branch: refs/heads/ignite-6181-1 Commit: f950cb15d55b6858987e3baca31ce1d350ec0c5d Parents: ea5cd0d Author: sboikov <[email protected]> Authored: Tue Sep 19 12:55:06 2017 +0300 Committer: sboikov <[email protected]> Committed: Tue Sep 19 12:56:44 2017 +0300 ---------------------------------------------------------------------- .../cache/distributed/near/GridNearTxLocal.java | 81 +++++--------- .../transactions/IgniteTransactionsImpl.java | 3 +- .../cache/transactions/IgniteTxManager.java | 15 +-- .../cache/IgniteTxConfigCacheSelfTest.java | 2 +- .../IgniteOptimisticTxSuspendResumeTest.java | 3 +- .../transactions/TxRollbackOnTimeoutTest.java | 109 +++++++++++++++---- .../hadoop/impl/HadoopTxConfigCacheTest.java | 2 +- 7 files changed, 121 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f950cb15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index 0e75b69..ad3b464 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -46,7 +46,6 @@ import org.apache.ignite.internal.processors.cache.EntryGetResult; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; -import org.apache.ignite.internal.processors.cache.GridCacheFuture; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture; import org.apache.ignite.internal.processors.cache.GridCacheOperation; @@ -57,14 +56,11 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocalAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareFuture; -import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtColocatedLockFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; -import org.apache.ignite.internal.processors.cache.local.GridLocalLockFuture; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; @@ -94,7 +90,6 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiClosure; import org.apache.ignite.lang.IgniteClosure; -import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.security.SecurityPermission; import org.apache.ignite.transactions.TransactionConcurrency; @@ -140,13 +135,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou private static final AtomicReferenceFieldUpdater<GridNearTxLocal, GridNearTxFinishFuture> ROLLBACK_FUT_UPD = AtomicReferenceFieldUpdater.newUpdater(GridNearTxLocal.class, GridNearTxFinishFuture.class, "rollbackFut"); - /** Lock future predicate. */ - private static final IgnitePredicate<GridCacheFuture<?>> LOCK_FUTURES = new IgnitePredicate<GridCacheFuture<?>>() { - @Override public boolean apply(GridCacheFuture<?> fut) { - return fut instanceof GridDhtColocatedLockFuture; - } - }; - /** DHT mappings. */ private IgniteTxMappings mappings; @@ -242,10 +230,10 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou mappings = implicitSingle ? new IgniteTxMappingsSingleImpl() : new IgniteTxMappingsImpl(); - if (this.timeout() > 0 && !implicit()) - cctx.time().addTimeoutObject(this); - initResult(); + + if (trackTimeout()) + cctx.time().addTimeoutObject(this); } /** {@inheritDoc} */ @@ -446,12 +434,12 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou Object... invokeArgs ) { return (IgniteInternalFuture<GridCacheReturn>)putAllAsync0(cacheCtx, - entryTopVer, - null, - map, - invokeArgs, - null, - true); + entryTopVer, + null, + map, + invokeArgs, + null, + true); } /** @@ -470,12 +458,12 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou }); return this.<Object, Object>putAllAsync0(cacheCtx, - null, - map, - null, - null, - drMap, - false); + null, + map, + null, + null, + drMap, + false); } /** @@ -2857,7 +2845,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou if (log.isDebugEnabled()) log.debug("Added mappings to transaction [locId=" + cctx.localNodeId() + ", key=" + key + ", node=" + node + - ", tx=" + this + ']'); + ", tx=" + this + ']'); } /** @@ -3532,7 +3520,7 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou } catch (IgniteCheckedException e) { log.debug("Failed to prepare transaction during rollback (will ignore) [tx=" + this + ", msg=" + - e.getMessage() + ']'); + e.getMessage() + ']'); } fut.finish(false); @@ -3710,11 +3698,8 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou if (state != ROLLING_BACK && state != ROLLED_BACK && state != COMMITTING && state != COMMITTED) rollback(); - else { - cctx.tm().onLocalClose(this); - + else removeTimeoutHandler(); - } synchronized (this) { try { @@ -4026,10 +4011,17 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou } /** + * @return {@code True} if need register callback which cancels tx on timeout. + */ + private boolean trackTimeout() { + return timeout() > 0 && !implicit(); + } + + /** * Removes timeout handler. */ private void removeTimeoutHandler() { - if (timeout() > 0 && !implicit()) + if (trackTimeout()) cctx.time().removeTimeoutObject(this); } @@ -4048,27 +4040,6 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements GridTimeou if (state(MARKED_ROLLBACK, true)) { cctx.kernalContext().closure().runLocalSafe(new Runnable() { @Override public void run() { - // Wait for active local lock futures completion to prevent races with deadlock detection. - Collection<GridCacheFuture<?>> lockFuts = F.view(cctx.mvcc().activeFutures(), LOCK_FUTURES); - - for (GridCacheFuture<?> fut : lockFuts) { - try { - GridDhtColocatedLockFuture locFut = (GridDhtColocatedLockFuture)fut; - - if (locFut.timeout() > 0) - fut.get(); - } - catch (IgniteCheckedException e) { - log.error("Failed to wait for lock future completion [fut=" + fut + ']', e); - } - } - - if (state() != MARKED_ROLLBACK) - return; - - log().error("Transaction is timed out and will be rolled back [timeout=" + timeout() + - ", tx=" + GridNearTxLocal.this + ']'); - rollbackNearTxLocalAsync(); } }); http://git-wip-us.apache.org/repos/asf/ignite/blob/f950cb15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java index 7adfb82..12f655a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTransactionsImpl.java @@ -154,8 +154,7 @@ public class IgniteTransactionsImpl<K, V> implements IgniteTransactionsEx { try { GridNearTxLocal tx = cctx.tm().userTx(sysCacheCtx); - // Allow overriding timed out transaction. - if (tx != null && !tx.timedOut()) + if (tx != null) throw new IllegalStateException("Failed to start new transaction " + "(current thread already has a transaction): " + tx); http://git-wip-us.apache.org/repos/asf/ignite/blob/f950cb15/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 827f2f6..6aa04ac 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -1422,7 +1422,9 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { */ private void clearThreadMap(IgniteInternalTx tx) { if (tx.local() && !tx.dht()) { - if (tx.system()) { + if (!tx.system()) + threadMap.remove(tx.threadId(), tx); + else { Integer cacheId = tx.txState().firstCacheId(); if (cacheId != null) @@ -1439,9 +1441,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } } } - else if (!tx.timedOut()) - threadMap.remove(tx.threadId(), tx); - } } @@ -1712,13 +1711,6 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } /** - * Removes tx from map. - */ - public void onLocalClose(IgniteInternalTx tx) { - threadMap.remove(tx.threadId(), tx); - } - - /** * @return All transactions. */ public Collection<IgniteInternalTx> txs() { @@ -2284,6 +2276,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { */ public void resumeTx(GridNearTxLocal tx) throws IgniteCheckedException { assert tx != null && !tx.system() : tx; + assert !threadMap.containsValue(tx) : tx; assert !transactionMap(tx).containsValue(tx) : tx; assert !haveSystemTxForThread(Thread.currentThread().getId()); http://git-wip-us.apache.org/repos/asf/ignite/blob/f950cb15/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java index af09858..f2e17e4 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteTxConfigCacheSelfTest.java @@ -63,7 +63,7 @@ public class IgniteTxConfigCacheSelfTest extends GridCommonAbstractTest { private static final String CACHE_NAME = "cache_name"; /** Timeout of transaction. */ - private static final long TX_TIMEOUT = 300; + private static final long TX_TIMEOUT = 100; /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { http://git-wip-us.apache.org/repos/asf/ignite/blob/f950cb15/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java index 7b88b6d..ae691f5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteOptimisticTxSuspendResumeTest.java @@ -45,7 +45,6 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; import static org.apache.ignite.transactions.TransactionState.ACTIVE; import static org.apache.ignite.transactions.TransactionState.COMMITTED; -import static org.apache.ignite.transactions.TransactionState.MARKED_ROLLBACK; import static org.apache.ignite.transactions.TransactionState.ROLLED_BACK; import static org.apache.ignite.transactions.TransactionState.SUSPENDED; @@ -54,7 +53,7 @@ import static org.apache.ignite.transactions.TransactionState.SUSPENDED; */ public class IgniteOptimisticTxSuspendResumeTest extends GridCommonAbstractTest { /** Transaction timeout. */ - private static final long TX_TIMEOUT = 300; + private static final long TX_TIMEOUT = 100; /** Future timeout */ private static final int FUT_TIMEOUT = 5000; http://git-wip-us.apache.org/repos/asf/ignite/blob/f950cb15/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java index 360cba1..2d6264c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxRollbackOnTimeoutTest.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.transactions; import java.util.concurrent.CountDownLatch; import javax.cache.CacheException; import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.configuration.CacheConfiguration; @@ -29,10 +30,13 @@ import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.util.GridConcurrentSkipListSet; +import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; @@ -106,13 +110,83 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest { stopAllGrids(); } - /** */ + /** + * @param e Exception. + */ protected void validateException(Exception e) { assertEquals("Deadlock report is expected", TransactionDeadlockException.class, e.getCause().getCause().getClass()); } /** + * @throws Exception If failed. + */ + public void testLockAndConcurrentTimeout() throws Exception { + startGrid("client"); + + for (Ignite node : G.allGrids()) { + log.info("Test with node: " + node.name()); + + lock(node, false); + + lock(node, false); + + lock(node, true); + } + } + + /** + * @param node Node. + * @param retry {@code True} + * @throws Exception If failed. + */ + private void lock(final Ignite node, final boolean retry) throws Exception { + final IgniteCache<Object, Object> cache = node.cache(CACHE_NAME); + + final int KEYS_PER_THREAD = 10_000; + + GridTestUtils.runMultiThreaded(new IgniteInClosure<Integer>() { + @Override public void apply(Integer idx) { + int start = idx * KEYS_PER_THREAD; + int end = start + KEYS_PER_THREAD; + + int locked = 0; + + try { + try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 500, 0)) { + for (int i = start; i < end; i++) { + cache.get(i); + + locked++; + } + + tx.commit(); + } + } + catch (Exception e) { + info("Expected error: " + e); + } + + info("Done, locked: " + locked); + + if (retry) { + try (Transaction tx = node.transactions().txStart(PESSIMISTIC, REPEATABLE_READ, 10 * 60_000, 0)) { + for (int i = start; i < end; i++) { + cache.get(i); + + locked++; + } + + cache.put(start, 0); + + tx.commit(); + } + } + } + }, 8, "tx-thread"); + } + + /** * Tests if timeout on first tx unblocks second tx waiting for the locked key. * * @throws Exception If failed. @@ -245,7 +319,10 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest { /** * Tests if deadlock is resolved on timeout with correct message. - * @throws Exception + * + * @param node1 First node. + * @param node2 Second node. + * @throws Exception If failed. */ private void testDeadlockUnblockedOnTimeout0(final Ignite node1, final Ignite node2) throws Exception { final CountDownLatch l = new CountDownLatch(2); @@ -265,7 +342,8 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest { fail(); } - } catch (CacheException e) { + } + catch (CacheException e) { // No-op. validateException(e); } @@ -352,7 +430,8 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest { tx.commit(); fail("Tx must timeout"); - } catch (IgniteException e) { + } + catch (IgniteException e) { assertTrue("Expected timeout exception", X.hasCause(e, TransactionTimeoutException.class)); } } @@ -364,6 +443,7 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest { * @param near Node. * @param mode Test mode. * + * @param timeout Tx timeout. * @throws Exception If failed. */ private void testTimeoutRemoval0(IgniteEx near, int mode, long timeout) throws Exception { @@ -402,12 +482,13 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest { GridConcurrentSkipListSet set = U.field(near.context().cache().context().time(), "timeoutObjs"); - for (Object obj : set) + for (Object obj : set) { if (obj.getClass().isAssignableFrom(GridNearTxLocal.class)) { - log.error("Last saved exception", saved); + log.error("Last saved exception: " + saved, saved); fail("Not remove for mode=" + mode + " and timeout=" + timeout); } + } } /** @@ -525,20 +606,4 @@ public class TxRollbackOnTimeoutTest extends GridCommonAbstractTest { fut2.get(); } - - /** - * Returns root cause for an exception. - * @param t Throwable. - * - * @return Root cause or input if none. - */ - private static Throwable getRootCause(Throwable t) { - Throwable cause; - Throwable res = t; - - while(null != (cause = res.getCause()) && (res != cause) ) - res = cause; - - return res; - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/f950cb15/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTxConfigCacheTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTxConfigCacheTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTxConfigCacheTest.java index 554c489..f404732 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTxConfigCacheTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopTxConfigCacheTest.java @@ -29,7 +29,7 @@ public class HadoopTxConfigCacheTest extends IgniteTxConfigCacheSelfTest { /** * Success if system caches weren't timed out. * - * @throws Exception + * @throws Exception If failed. */ public void testSystemCacheTx() throws Exception { final Ignite ignite = grid(0);
