This is an automated email from the ASF dual-hosted git repository. irakov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 83f3baa IGNITE-12076 Fixed PME hang on client node caused by optimistic transactions and cache destroying - Fixes #6779. 83f3baa is described below commit 83f3baa60f7faf031279ec43772de7f3c1c8c769 Author: Slava Koptilin <slava.kopti...@gmail.com> AuthorDate: Fri Aug 30 20:28:31 2019 +0300 IGNITE-12076 Fixed PME hang on client node caused by optimistic transactions and cache destroying - Fixes #6779. Signed-off-by: Ivan Rakov <ira...@apache.org> --- .../processors/cache/GridCacheContext.java | 42 +-- .../processors/cache/GridCacheEventManager.java | 14 +- .../processors/cache/GridCacheProcessor.java | 31 +-- .../processors/cache/GridCacheProxyImpl.java | 6 +- ...dNearOptimisticSerializableTxPrepareFuture.java | 10 + .../near/GridNearOptimisticTxPrepareFuture.java | 8 + .../GridNearOptimisticTxPrepareFutureAdapter.java | 18 ++ .../cache/transactions/IgniteTxManager.java | 71 +++-- .../cache/transactions/IgniteTxStateImpl.java | 24 +- .../cache/transactions/TxOnCachesStopTest.java | 288 ++++++++++++++++++++- 10 files changed, 425 insertions(+), 87 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 9d52c75..9ddafb2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -675,14 +675,14 @@ public class GridCacheContext<K, V> implements Externalizable { * @return {@code True} if cache is replicated cache. */ public boolean isReplicated() { - return cacheCfg.getCacheMode() == CacheMode.REPLICATED; + return config().getCacheMode() == CacheMode.REPLICATED; } /** * @return {@code True} if cache is partitioned cache. */ public boolean isPartitioned() { - return cacheCfg.getCacheMode() == CacheMode.PARTITIONED; + return config().getCacheMode() == CacheMode.PARTITIONED; } /** @@ -696,7 +696,7 @@ public class GridCacheContext<K, V> implements Externalizable { * @return {@code True} in case cache supports query. */ public boolean isQueryEnabled() { - return !F.isEmpty(cacheCfg.getQueryEntities()); + return !F.isEmpty(config().getQueryEntities()); } /** @@ -849,21 +849,23 @@ public class GridCacheContext<K, V> implements Externalizable { * @return {@code True} if atomic. */ public boolean atomic() { - return cacheCfg.getAtomicityMode() == ATOMIC; + return config().getAtomicityMode() == ATOMIC; } /** * @return {@code True} if transactional. */ public boolean transactional() { - return cacheCfg.getAtomicityMode() == TRANSACTIONAL || cacheCfg.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT; + CacheConfiguration cfg = config(); + + return cfg.getAtomicityMode() == TRANSACTIONAL || cfg.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT; } /** * @return {@code True} if transactional snapshot. */ public boolean transactionalSnapshot() { - return cacheCfg.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT; + return config().getAtomicityMode() == TRANSACTIONAL_SNAPSHOT; } /** @@ -1043,9 +1045,15 @@ public class GridCacheContext<K, V> implements Externalizable { /** * @return Cache configuration for given cache instance. + * @throws IllegalStateException If this cache context was cleaned up. */ public CacheConfiguration config() { - return cacheCfg; + CacheConfiguration res = cacheCfg; + + if (res == null) + throw new IllegalStateException((new CacheStoppedException(name()))); + + return res; } /** @@ -1054,7 +1062,7 @@ public class GridCacheContext<K, V> implements Externalizable { * are set to {@code true} or the store is local. */ public boolean writeToStoreFromDht() { - return store().isLocal() || cacheCfg.isWriteBehindEnabled(); + return store().isLocal() || config().isWriteBehindEnabled(); } /** @@ -1488,56 +1496,56 @@ public class GridCacheContext<K, V> implements Externalizable { * @return {@code True} if store read-through mode is enabled. */ public boolean readThrough() { - return cacheCfg.isReadThrough() && !skipStore(); + return config().isReadThrough() && !skipStore(); } /** * @return {@code True} if store and read-through mode are enabled in configuration. */ public boolean readThroughConfigured() { - return store().configured() && cacheCfg.isReadThrough(); + return store().configured() && config().isReadThrough(); } /** * @return {@code True} if {@link CacheConfiguration#isLoadPreviousValue()} flag is set. */ public boolean loadPreviousValue() { - return cacheCfg.isLoadPreviousValue(); + return config().isLoadPreviousValue(); } /** * @return {@code True} if store write-through is enabled. */ public boolean writeThrough() { - return cacheCfg.isWriteThrough() && !skipStore(); + return config().isWriteThrough() && !skipStore(); } /** * @return {@code True} if invalidation is enabled. */ public boolean isInvalidate() { - return cacheCfg.isInvalidate(); + return config().isInvalidate(); } /** * @return {@code True} if synchronous commit is enabled. */ public boolean syncCommit() { - return cacheCfg.getWriteSynchronizationMode() == FULL_SYNC; + return config().getWriteSynchronizationMode() == FULL_SYNC; } /** * @return {@code True} if synchronous rollback is enabled. */ public boolean syncRollback() { - return cacheCfg.getWriteSynchronizationMode() == FULL_SYNC; + return config().getWriteSynchronizationMode() == FULL_SYNC; } /** * @return {@code True} if only primary node should be updated synchronously. */ public boolean syncPrimary() { - return cacheCfg.getWriteSynchronizationMode() == PRIMARY_SYNC; + return config().getWriteSynchronizationMode() == PRIMARY_SYNC; } /** @@ -1753,7 +1761,7 @@ public class GridCacheContext<K, V> implements Externalizable { * of {@link CacheConfiguration#isCopyOnRead()}. */ public boolean needValueCopy() { - return affNode && cacheCfg.isCopyOnRead(); + return affNode && config().isCopyOnRead(); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java index 501da08..c095ebe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache; import java.util.Collection; import java.util.UUID; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.events.CacheEvent; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; @@ -388,11 +389,18 @@ public class GridCacheEventManager extends GridCacheManagerAdapter { GridCacheContext cctx0 = cctx; // Event recording is impossible in recovery mode. - if (cctx0 != null && cctx0.kernalContext().recoveryMode()) + if (cctx0 == null || cctx0.kernalContext().recoveryMode()) return false; - return cctx0 != null && cctx0.userCache() && cctx0.gridEvents().isRecordable(type) - && cctx0.config() != null && !cctx0.config().isEventsDisabled(); + try { + CacheConfiguration cfg = cctx0.config(); + + return cctx0.userCache() && cctx0.gridEvents().isRecordable(type) && !cfg.isEventsDisabled(); + } + catch (IllegalStateException e) { + // Cache context was cleaned up. + return false; + } } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 44a985c..7d4e4bd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -2691,6 +2691,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { for (ExchangeActions.CacheActionData action: cachesToStopByGrp.getValue()) { stopGateway(action.request()); + context().tm().rollbackTransactionsForStoppingCache(action.descriptor().cacheId()); + sharedCtx.database().checkpointReadLock(); try { @@ -2829,40 +2831,11 @@ public class GridCacheProcessor extends GridProcessorAdapter { ((GridServiceProcessor)ctx.service()).updateUtilityCache(); } - rollbackCoveredTx(exchActions); - if (err == null) processCacheStopRequestOnExchangeDone(exchActions); } /** - * Rollback tx covered by stopped caches. - * - * @param exchActions Change requests. - */ - private void rollbackCoveredTx(ExchangeActions exchActions) { - if (!exchActions.cacheGroupsToStop().isEmpty() || !exchActions.cacheStopRequests().isEmpty()) { - Set<Integer> cachesToStop = new HashSet<>(); - - for (ExchangeActions.CacheGroupActionData act : exchActions.cacheGroupsToStop()) { - @Nullable CacheGroupContext grpCtx = context().cache().cacheGroup(act.descriptor().groupId()); - - if (grpCtx != null && grpCtx.sharedGroup()) - cachesToStop.addAll(grpCtx.cacheIds()); - } - - for (ExchangeActions.CacheActionData act : exchActions.cacheStopRequests()) - cachesToStop.add(act.descriptor().cacheId()); - - if (!cachesToStop.isEmpty()) { - IgniteTxManager tm = context().tm(); - - tm.rollbackTransactionsForCaches(cachesToStop); - } - } - } - - /** * @param grpId Group ID. */ private void stopCacheGroup(int grpId) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java index 6fd4269..eee17de 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java @@ -104,7 +104,11 @@ public class GridCacheProxyImpl<K, V> implements IgniteInternalCache<K, V>, Exte gate = ctx.gate(); - aff = new GridCacheAffinityProxy<>(ctx, ctx.cache().affinity()); + GridCacheAdapter adapter = ctx.cache(); + if (adapter == null) + throw new IllegalStateException(new CacheStoppedException(ctx.name())); + + aff = new GridCacheAffinityProxy<>(ctx, adapter.affinity()); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java index 46bab86..1a7cfb0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@ -42,6 +42,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException; +import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -51,6 +52,7 @@ import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiInClosure; @@ -969,6 +971,14 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim * @param res Response. */ private void remap(final GridNearTxPrepareResponse res) { + if (parent.tx.isRollbackOnly()) { + onDone(new IgniteTxRollbackCheckedException( + "Failed to prepare the transaction, due to the transaction is marked as rolled back " + + "[tx=" + CU.txString(parent.tx) + ']')); + + return; + } + parent.prepareOnTopology(true, new Runnable() { @Override public void run() { onDone(res); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index 140f593..791e018 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -1013,6 +1013,14 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa * */ private void remap() { + if (parent.tx.isRollbackOnly()) { + onDone(new IgniteTxRollbackCheckedException( + "Failed to prepare the transaction, due to the transaction is marked as rolled back " + + "[tx=" + CU.txString(parent.tx) + ']')); + + return; + } + parent.prepareOnTopology(true, new Runnable() { @Override public void run() { onDone((GridNearTxPrepareResponse) null); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java index 13b03fe..fb62880 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java @@ -25,11 +25,13 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; +import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.jetbrains.annotations.Nullable; @@ -178,6 +180,14 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT return; } + if (tx.isRollbackOnly()) { + onDone(new IgniteTxRollbackCheckedException( + "Failed to prepare the transaction, due to the transaction is marked as rolled back " + + "[tx=" + CU.txString(tx) + ']')); + + return; + } + prepare0(remap, false); if (c != null) @@ -189,6 +199,14 @@ public abstract class GridNearOptimisticTxPrepareFutureAdapter extends GridNearT return; try { + if (tx.isRollbackOnly()) { + onDone(new IgniteTxRollbackCheckedException( + "Failed to prepare the transaction, due to the transaction is marked as rolled back " + + "[tx=" + CU.txString(tx) + ']')); + + return; + } + prepareOnTopology(remap, c); } finally { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index 6ff8b77..d73dc39 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -79,6 +79,7 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cluster.BaselineTopology; import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException; +import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedMap; import org.apache.ignite.internal.util.future.GridCompoundFuture; @@ -327,35 +328,67 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { } /** - * @param cachesToStop Caches to stop. + * @param cacheToStop Cache to stop. */ - public void rollbackTransactionsForCaches(Set<Integer> cachesToStop) { - if (!cachesToStop.isEmpty()) { - IgniteTxManager tm = context().tm(); + public void rollbackTransactionsForStoppingCache(int cacheToStop) { + GridCompoundFuture<IgniteInternalTx, IgniteInternalTx> compFut = new GridCompoundFuture<>(); - Collection<IgniteInternalTx> active = tm.activeTransactions(); + Collection<IgniteInternalTx> active = activeTransactions(); - GridCompoundFuture<IgniteInternalTx, IgniteInternalTx> compFut = new GridCompoundFuture<>(); + for (IgniteInternalTx tx : active) { + IgniteTxState state = tx.txState(); - for (IgniteInternalTx tx : active) { - for (IgniteTxEntry e : tx.allEntries()) { - if (cachesToStop.contains(e.context().cacheId())) { - compFut.add(tx.rollbackAsync()); + Collection<IgniteTxEntry> txEntries = + state instanceof IgniteTxStateImpl ? ((IgniteTxStateImpl)state).allEntriesCopy() : state.allEntries(); - break; - } + for (IgniteTxEntry e : txEntries) { + if (e.context().cacheId() == cacheToStop) { + compFut.add(failTxOnPreparing(tx)); + + break; } } + } - compFut.markInitialized(); + compFut.markInitialized(); - try { - compFut.get(); - } - catch (IgniteCheckedException e) { - U.error(log, "Error occured during tx rollback.", e); - } + try { + compFut.get(); } + catch (IgniteCheckedException e) { + U.error(log, "Error occurred during tx rollback.", e); + } + } + + /** + * This method allows to roll back the transaction during partition map exchange related to destroying a cache(s). + * Semantically, this method is equivalent to two subsequent calls: + * <pre> + * tx.rollbackAsync(); + * tx.currentPrepareFuture().onDone(new IgniteTxRollbackCheckedException()) + * </pre> + * + * It is assumed that the given transaction did not acquired any locks. + * + * @param tx Transaction. + * @return Rollback future. + */ + private IgniteInternalFuture<IgniteInternalTx> failTxOnPreparing(IgniteInternalTx tx) { + IgniteInternalFuture<IgniteInternalTx> rollbackFut = tx.rollbackAsync(); + + IgniteInternalFuture prepFut = tx.currentPrepareFuture(); + + if (prepFut != null) { + assert prepFut instanceof GridFutureAdapter : + "It is assumed that prepare future should extend GridFutureAdapter class [prepFut=" + prepFut + ']'; + + ((GridFutureAdapter)prepFut).onDone( + new IgniteTxRollbackCheckedException( + "Failed to prepare the transaction, due to the transaction is marked as rolled back " + + "[tx=" + CU.txString(tx) + ']')); + } + + return rollbackFut; } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java index bba2e63..40299ed 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java @@ -24,7 +24,6 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; - import org.apache.ignite.IgniteCacheRestartingException; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; @@ -289,15 +288,18 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter { GridCacheContext<?, ?> nonLocCtx = null; + Map<Integer, GridCacheContext> cacheCtxs = U.newHashMap(activeCacheIds.size()); + for (int i = 0; i < activeCacheIds.size(); i++) { int cacheId = activeCacheIds.get(i); GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId); if (!cacheCtx.isLocal()) { - nonLocCtx = cacheCtx; + if (nonLocCtx == null) + nonLocCtx = cacheCtx; - break; + cacheCtxs.putIfAbsent(cacheCtx.cacheId(), cacheCtx); } } @@ -306,13 +308,17 @@ public class IgniteTxStateImpl extends IgniteTxLocalStateAdapter { nonLocCtx.topology().readLock(); - if (nonLocCtx.topology().stopping()) { - fut.onDone( - cctx.cache().isCacheRestarting(nonLocCtx.name())? - new IgniteCacheRestartingException(nonLocCtx.name()): - new CacheStoppedException(nonLocCtx.name())); + for (Map.Entry<Integer, GridCacheContext> e : cacheCtxs.entrySet()) { + GridCacheContext activeCacheCtx = e.getValue(); - return null; + if (activeCacheCtx.topology().stopping()) { + fut.onDone( + cctx.cache().isCacheRestarting(activeCacheCtx.name()) ? + new IgniteCacheRestartingException(activeCacheCtx.name()) : + new CacheStoppedException(activeCacheCtx.name())); + + return null; + } } return nonLocCtx.topology().topologyVersionFuture(); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOnCachesStopTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOnCachesStopTest.java index 52ec992..948d8c1 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOnCachesStopTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOnCachesStopTest.java @@ -17,9 +17,17 @@ package org.apache.ignite.internal.processors.cache.transactions; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.CacheException; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.Ignition; import org.apache.ignite.cache.CacheAtomicityMode; @@ -35,10 +43,14 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.TestRecordingCommunicationSpi; import org.apache.ignite.internal.processors.cache.CacheInvalidStateException; import org.apache.ignite.internal.processors.cache.CacheStoppedException; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; import org.apache.ignite.internal.util.GridRandom; +import org.apache.ignite.internal.util.future.GridCompoundFuture; import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgniteFutureTimeoutException; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.MvccFeatureChecker; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -46,8 +58,12 @@ import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; import org.apache.ignite.transactions.TransactionRollbackException; +import org.junit.Assume; import org.junit.Test; +import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; + /** * */ @@ -67,6 +83,9 @@ public class TxOnCachesStopTest extends GridCommonAbstractTest { /** */ private CacheConfiguration<Integer, byte[]> surviveCacheCfg; + /** */ + private static final int CACHE_CNT = 30; + /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); @@ -101,7 +120,7 @@ public class TxOnCachesStopTest extends GridCommonAbstractTest { surviveCacheCfg = ccfg2; - cfg.setCacheConfiguration(ccfg1, ccfg2); + cfg.setCacheConfiguration(destroyCacheCfg, surviveCacheCfg); return cfg; } @@ -132,7 +151,7 @@ public class TxOnCachesStopTest extends GridCommonAbstractTest { */ @Test public void testTxOnCacheStopNoMessageBlock() throws Exception { - testTxOnCacheStop(false); + runTxOnCacheStop(false); } /** @@ -140,13 +159,13 @@ public class TxOnCachesStopTest extends GridCommonAbstractTest { */ @Test public void testTxOnCacheStopWithMessageBlock() throws Exception { - testTxOnCacheStop(true); + runTxOnCacheStop(true); } /** * @param block {@code True} To block GridNearTxPrepareRequest message. */ - public void testTxOnCacheStop(boolean block) throws Exception { + private void runTxOnCacheStop(boolean block) throws Exception { startGridsMultiThreaded(2); Ignition.setClientMode(true); @@ -155,9 +174,10 @@ public class TxOnCachesStopTest extends GridCommonAbstractTest { ig.cluster().active(true); - for (TransactionConcurrency conc : TransactionConcurrency.values()) + for (TransactionConcurrency conc : TransactionConcurrency.values()) { for (TransactionIsolation iso : TransactionIsolation.values()) runTxOnCacheStop(conc, iso, ig, block); + } } /** @@ -173,19 +193,91 @@ public class TxOnCachesStopTest extends GridCommonAbstractTest { ig.cluster().active(true); - for (TransactionConcurrency conc : TransactionConcurrency.values()) + for (TransactionConcurrency conc : TransactionConcurrency.values()) { for (TransactionIsolation iso : TransactionIsolation.values()) runCacheStopInMidTx(conc, iso, ig); + } } /** * @throws Exception If failed. */ - private void runTxOnCacheStop(TransactionConcurrency conc, TransactionIsolation iso, Ignite ig, boolean runConc) - throws Exception { + @Test + public void testOptimisticTxMappedOnPMETopology() throws Exception { + Assume.assumeFalse(MvccFeatureChecker.forcedMvcc()); + + startGridsMultiThreaded(1); + + Ignition.setClientMode(true); + + Ignite client = startGrid("client"); + + client.cluster().active(true); + + awaitPartitionMapExchange(true, true, null); + + final IgniteCache<Integer, byte[]> cache = client.getOrCreateCache(destroyCacheCfg); + final IgniteCache<Integer, byte[]> cache2 = client.getOrCreateCache(surviveCacheCfg); + + final TestRecordingCommunicationSpi srvSpi = TestRecordingCommunicationSpi.spi(grid(0)); + + CountDownLatch destroyLatch = new CountDownLatch(1); + + srvSpi.blockMessages((node, msg) -> (msg instanceof GridDhtPartitionsFullMessage)); + + try (Transaction tx = client.transactions().txStart(OPTIMISTIC, SERIALIZABLE)) { + cache2.put(100, new byte[1024]); + cache.put(100, new byte[1024]); + + GridTestUtils.runAsync(() -> { + grid(0).destroyCache(destroyCacheCfg.getName()); + + destroyLatch.countDown(); + }); + + destroyLatch.await(); + + IgniteFuture commitFut = tx.commitAsync(); + + srvSpi.stopBlock(); + + commitFut.get(10_000); + + fail("Transaction should be rolled back."); + } + catch (IgniteFutureTimeoutException fte) { + srvSpi.stopBlock(); + + fail("Partition map exchange hangs [err=" + fte + ']'); + } + catch (IgniteException e) { + srvSpi.stopBlock(); + + assertTrue(X.hasCause(e, CacheInvalidStateException.class) || X.hasCause(e, IgniteException.class)); + } + } + + /** + * @param conc Concurrency mode. + * @param iso Isolation level. + * @param ig Client node. + * @param runConc {@code true} if a cache should be destroyed concurrently. + * @throws Exception If Failed. + */ + private void runTxOnCacheStop( + TransactionConcurrency conc, + TransactionIsolation iso, + Ignite ig, + boolean runConc + ) throws Exception { if ((conc == TransactionConcurrency.OPTIMISTIC) && (MvccFeatureChecker.forcedMvcc())) return; + if (log.isInfoEnabled()) { + log.info("Starting runTxOnCacheStop " + + "[concurrency=" + conc + ", isolation=" + iso + ", blockPrepareRequests=" + !runConc + ']'); + } + CountDownLatch destroyLatch = new CountDownLatch(1); final IgniteCache<Integer, byte[]> cache = ig.getOrCreateCache(destroyCacheCfg); @@ -255,10 +347,188 @@ public class TxOnCachesStopTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ + @Test + public void testOptimisticTransactionsOnCacheDestroy() throws Exception { + Assume.assumeFalse(MvccFeatureChecker.forcedMvcc()); + + startGridsMultiThreaded(3); + + Ignition.setClientMode(true); + + ArrayList<Ignite> clients = new ArrayList<>(); + for (int ci = 0; ci < 2; ++ci) + clients.add(startGrid("client-" + ci)); + + clients.get(0).cluster().active(true); + + for (TransactionIsolation iso : TransactionIsolation.values()) { + grid(0).getOrCreateCaches(createCacheConfigurations()); + + // Make sure that all caches are started. + awaitPartitionMapExchange(); + + testConcurrentTransactionsOnCacheDestroy(clients, OPTIMISTIC, iso); + + // Make sure that all caches are stopped. + awaitPartitionMapExchange(); + } + } + + /** + * Creates a list of cache configurations. + * + * @return List of cache configurations. + */ + private List<CacheConfiguration> createCacheConfigurations() { + String GRP_NAME = "test-destroy-group"; + + List<CacheConfiguration> cacheCfgs = new ArrayList<>(CACHE_CNT); + + for (int i = 0; i < CACHE_CNT; ++i) { + CacheConfiguration<Integer, byte[]> c = new CacheConfiguration<>("test-cache-" + i); + c.setBackups(2); + c.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + c.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + c.setAffinity(new RendezvousAffinityFunction(false, 32)); + c.setGroupName(GRP_NAME); + + cacheCfgs.add(c); + } + + return cacheCfgs; + } + + /** + * @param clients Client nodes that are used for initiating transactions. + * @param conc Transaction concurrency mode. + * @param iso Transaction isolation. + * @throws Exception If failed. + */ + private void testConcurrentTransactionsOnCacheDestroy( + final ArrayList<Ignite> clients, + TransactionConcurrency conc, + TransactionIsolation iso + ) throws Exception { + if (log.isInfoEnabled()) { + log.info("Starting testConcurrentTransactionsOnCacheDestroy " + + "[concurrency=" + conc + ", isolation=" + iso + ']'); + } + + final AtomicBoolean stopTxLoad = new AtomicBoolean(); + final AtomicInteger cacheIdxToBeDestroyed = new AtomicInteger(-1); + + IgniteInternalFuture txLoadFut = startTxLoad(stopTxLoad, cacheIdxToBeDestroyed, clients, conc, iso); + + try { + for (int i = 0; i < CACHE_CNT; ++i) { + int clientIdx = (i % clients.size()); + + IgniteInternalFuture destFut = GridTestUtils.runAsync(() -> + clients.get(clientIdx).destroyCache("test-cache-" + cacheIdxToBeDestroyed.incrementAndGet()) + ); + + try { + destFut.get(15, TimeUnit.SECONDS); + } + catch (IgniteCheckedException e) { + fail("Looks like PME hangs [err=" + e + ']'); + } + } + } + catch (Throwable t) { + fail("Unexpected error [err=" + t + ']'); + } + + stopTxLoad.set(true); + + txLoadFut.get(); + } + + /** + * Starts transactional load. + * + * @param stopTxLoad Boolean flag that is used to stop transactional load. + * @param cacheIdxToBeDestroyed Variable that allows to get an index of destroyed cache. + * @param clients Client nodes that are used for initiating transactions. + * @param concurrency Transaction concurrency mode. + * @param isolation Transaction isolation. + * @return TxLoad future. + */ + private IgniteInternalFuture startTxLoad ( + final AtomicBoolean stopTxLoad, + final AtomicInteger cacheIdxToBeDestroyed, + final List<Ignite> clients, + TransactionConcurrency concurrency, + TransactionIsolation isolation){ + final GridCompoundFuture fut = new GridCompoundFuture(); + + for (Ignite c : clients) { + for (int i = 0; i < CACHE_CNT; ++i) + c.getOrCreateCache("test-cache-" + i); + } + + clients.forEach(c -> { + fut.add(GridTestUtils.runAsync(() -> { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + List<IgniteCache<Integer, byte[]>> caches = new ArrayList<>(); + + for (int i = 0; i < CACHE_CNT; ++i) { + IgniteCache<Integer, byte[]> testCache = c.cache("test-cache-" + i); + + if (testCache == null) { + throw new IllegalStateException( + "Cache test-cache-" + i + " is not started " + + "on client node " + c.configuration().getIgniteInstanceName()); + } + + caches.add(testCache); + } + + byte[] val = new byte[128]; + + while (!stopTxLoad.get()) { + try (Transaction tx = c.transactions().txStart(concurrency, isolation)) { + int cacheIdx = cacheIdxToBeDestroyed.get(); + + caches.get(Math.max(0, cacheIdx)).put(rnd.nextInt(), val); + caches.get(rnd.nextInt(Math.min(cacheIdx + 1, caches.size() - 1), caches.size())).put(rnd.nextInt(), val); + + doSleep(200); + + tx.commit(); + } + // Expected exceptions: + catch (TransactionRollbackException | CacheException e) { + // Failed to prepare the transaction (transaction is marked as rolled back). + if (!X.hasCause(e, TransactionRollbackException.class)) + throw e; + } + catch (IgniteException | IllegalStateException e) { + // Failed to perform cache operation (cache is stopped). + } + } + }, "tx-load-" + c.configuration().getIgniteInstanceName())); + }); + + fut.markInitialized(); + + return fut; + } + + /** + * @param conc Concurrency mode. + * @param iso Isolation level. + * @param ig Client node. + * @throws Exception If failed. + */ private void runCacheStopInMidTx(TransactionConcurrency conc, TransactionIsolation iso, Ignite ig) throws Exception { if ((conc == TransactionConcurrency.OPTIMISTIC) && (MvccFeatureChecker.forcedMvcc())) return; + if (log.isInfoEnabled()) + log.info("Starting runCacheStopInMidTx [concurrency=" + conc + ", isolation=" + iso + ']'); + CountDownLatch destroyLatch = new CountDownLatch(1); CountDownLatch putLatch = new CountDownLatch(1); @@ -303,7 +573,7 @@ public class TxOnCachesStopTest extends GridCommonAbstractTest { e.printStackTrace(); } - }); + }, "tx-load-thread"); f1.get(); f0.get();