This is an automated email from the ASF dual-hosted git repository.

irakov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 83f3baa  IGNITE-12076 Fixed PME hang on client node caused by 
optimistic transactions and cache destroying - Fixes #6779.
83f3baa is described below

commit 83f3baa60f7faf031279ec43772de7f3c1c8c769
Author: Slava Koptilin <slava.kopti...@gmail.com>
AuthorDate: Fri Aug 30 20:28:31 2019 +0300

    IGNITE-12076 Fixed PME hang on client node caused by optimistic 
transactions and cache destroying - Fixes #6779.
    
    Signed-off-by: Ivan Rakov <ira...@apache.org>
---
 .../processors/cache/GridCacheContext.java         |  42 +--
 .../processors/cache/GridCacheEventManager.java    |  14 +-
 .../processors/cache/GridCacheProcessor.java       |  31 +--
 .../processors/cache/GridCacheProxyImpl.java       |   6 +-
 ...dNearOptimisticSerializableTxPrepareFuture.java |  10 +
 .../near/GridNearOptimisticTxPrepareFuture.java    |   8 +
 .../GridNearOptimisticTxPrepareFutureAdapter.java  |  18 ++
 .../cache/transactions/IgniteTxManager.java        |  71 +++--
 .../cache/transactions/IgniteTxStateImpl.java      |  24 +-
 .../cache/transactions/TxOnCachesStopTest.java     | 288 ++++++++++++++++++++-
 10 files changed, 425 insertions(+), 87 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 9d52c75..9ddafb2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -675,14 +675,14 @@ public class GridCacheContext<K, V> implements 
Externalizable {
      * @return {@code True} if cache is replicated cache.
      */
     public boolean isReplicated() {
-        return cacheCfg.getCacheMode() == CacheMode.REPLICATED;
+        return config().getCacheMode() == CacheMode.REPLICATED;
     }
 
     /**
      * @return {@code True} if cache is partitioned cache.
      */
     public boolean isPartitioned() {
-        return cacheCfg.getCacheMode() == CacheMode.PARTITIONED;
+        return config().getCacheMode() == CacheMode.PARTITIONED;
     }
 
     /**
@@ -696,7 +696,7 @@ public class GridCacheContext<K, V> implements 
Externalizable {
      * @return {@code True} in case cache supports query.
      */
     public boolean isQueryEnabled() {
-        return !F.isEmpty(cacheCfg.getQueryEntities());
+        return !F.isEmpty(config().getQueryEntities());
     }
 
     /**
@@ -849,21 +849,23 @@ public class GridCacheContext<K, V> implements 
Externalizable {
      * @return {@code True} if atomic.
      */
     public boolean atomic() {
-        return cacheCfg.getAtomicityMode() == ATOMIC;
+        return config().getAtomicityMode() == ATOMIC;
     }
 
     /**
      * @return {@code True} if transactional.
      */
     public boolean transactional() {
-        return cacheCfg.getAtomicityMode() == TRANSACTIONAL || 
cacheCfg.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT;
+        CacheConfiguration cfg = config();
+
+        return cfg.getAtomicityMode() == TRANSACTIONAL || 
cfg.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT;
     }
 
     /**
      * @return {@code True} if transactional snapshot.
      */
     public boolean transactionalSnapshot() {
-        return cacheCfg.getAtomicityMode() == TRANSACTIONAL_SNAPSHOT;
+        return config().getAtomicityMode() == TRANSACTIONAL_SNAPSHOT;
     }
 
     /**
@@ -1043,9 +1045,15 @@ public class GridCacheContext<K, V> implements 
Externalizable {
 
     /**
      * @return Cache configuration for given cache instance.
+     * @throws IllegalStateException If this cache context was cleaned up.
      */
     public CacheConfiguration config() {
-        return cacheCfg;
+        CacheConfiguration res = cacheCfg;
+
+        if (res == null)
+            throw new IllegalStateException((new 
CacheStoppedException(name())));
+
+        return res;
     }
 
     /**
@@ -1054,7 +1062,7 @@ public class GridCacheContext<K, V> implements 
Externalizable {
      *      are set to {@code true} or the store is local.
      */
     public boolean writeToStoreFromDht() {
-        return store().isLocal() || cacheCfg.isWriteBehindEnabled();
+        return store().isLocal() || config().isWriteBehindEnabled();
     }
 
     /**
@@ -1488,56 +1496,56 @@ public class GridCacheContext<K, V> implements 
Externalizable {
      * @return {@code True} if store read-through mode is enabled.
      */
     public boolean readThrough() {
-        return cacheCfg.isReadThrough() && !skipStore();
+        return config().isReadThrough() && !skipStore();
     }
 
     /**
      * @return {@code True} if store and read-through mode are enabled in 
configuration.
      */
     public boolean readThroughConfigured() {
-        return store().configured() && cacheCfg.isReadThrough();
+        return store().configured() && config().isReadThrough();
     }
 
     /**
      * @return {@code True} if {@link 
CacheConfiguration#isLoadPreviousValue()} flag is set.
      */
     public boolean loadPreviousValue() {
-        return cacheCfg.isLoadPreviousValue();
+        return config().isLoadPreviousValue();
     }
 
     /**
      * @return {@code True} if store write-through is enabled.
      */
     public boolean writeThrough() {
-        return cacheCfg.isWriteThrough() && !skipStore();
+        return config().isWriteThrough() && !skipStore();
     }
 
     /**
      * @return {@code True} if invalidation is enabled.
      */
     public boolean isInvalidate() {
-        return cacheCfg.isInvalidate();
+        return config().isInvalidate();
     }
 
     /**
      * @return {@code True} if synchronous commit is enabled.
      */
     public boolean syncCommit() {
-        return cacheCfg.getWriteSynchronizationMode() == FULL_SYNC;
+        return config().getWriteSynchronizationMode() == FULL_SYNC;
     }
 
     /**
      * @return {@code True} if synchronous rollback is enabled.
      */
     public boolean syncRollback() {
-        return cacheCfg.getWriteSynchronizationMode() == FULL_SYNC;
+        return config().getWriteSynchronizationMode() == FULL_SYNC;
     }
 
     /**
      * @return {@code True} if only primary node should be updated 
synchronously.
      */
     public boolean syncPrimary() {
-        return cacheCfg.getWriteSynchronizationMode() == PRIMARY_SYNC;
+        return config().getWriteSynchronizationMode() == PRIMARY_SYNC;
     }
 
     /**
@@ -1753,7 +1761,7 @@ public class GridCacheContext<K, V> implements 
Externalizable {
      * of {@link CacheConfiguration#isCopyOnRead()}.
      */
     public boolean needValueCopy() {
-        return affNode && cacheCfg.isCopyOnRead();
+        return affNode && config().isCopyOnRead();
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
index 501da08..c095ebe 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache;
 import java.util.Collection;
 import java.util.UUID;
 import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.events.CacheEvent;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
@@ -388,11 +389,18 @@ public class GridCacheEventManager extends 
GridCacheManagerAdapter {
         GridCacheContext cctx0 = cctx;
 
         // Event recording is impossible in recovery mode.
-        if (cctx0 != null && cctx0.kernalContext().recoveryMode())
+        if (cctx0 == null || cctx0.kernalContext().recoveryMode())
             return false;
 
-        return cctx0 != null && cctx0.userCache() && 
cctx0.gridEvents().isRecordable(type)
-            && cctx0.config() != null && !cctx0.config().isEventsDisabled();
+        try {
+            CacheConfiguration cfg = cctx0.config();
+
+            return cctx0.userCache() && cctx0.gridEvents().isRecordable(type) 
&& !cfg.isEventsDisabled();
+        }
+        catch (IllegalStateException e) {
+            // Cache context was cleaned up.
+            return false;
+        }
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index 44a985c..7d4e4bd 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -2691,6 +2691,8 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
                             for (ExchangeActions.CacheActionData action: 
cachesToStopByGrp.getValue()) {
                                 stopGateway(action.request());
 
+                                
context().tm().rollbackTransactionsForStoppingCache(action.descriptor().cacheId());
+
                                 sharedCtx.database().checkpointReadLock();
 
                                 try {
@@ -2829,40 +2831,11 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
                 ((GridServiceProcessor)ctx.service()).updateUtilityCache();
         }
 
-        rollbackCoveredTx(exchActions);
-
         if (err == null)
             processCacheStopRequestOnExchangeDone(exchActions);
     }
 
     /**
-     * Rollback tx covered by stopped caches.
-     *
-     * @param exchActions Change requests.
-     */
-    private void rollbackCoveredTx(ExchangeActions exchActions) {
-        if (!exchActions.cacheGroupsToStop().isEmpty() || 
!exchActions.cacheStopRequests().isEmpty()) {
-            Set<Integer> cachesToStop = new HashSet<>();
-
-            for (ExchangeActions.CacheGroupActionData act : 
exchActions.cacheGroupsToStop()) {
-                @Nullable CacheGroupContext grpCtx = 
context().cache().cacheGroup(act.descriptor().groupId());
-
-                if (grpCtx != null && grpCtx.sharedGroup())
-                    cachesToStop.addAll(grpCtx.cacheIds());
-            }
-
-            for (ExchangeActions.CacheActionData act : 
exchActions.cacheStopRequests())
-                cachesToStop.add(act.descriptor().cacheId());
-
-            if (!cachesToStop.isEmpty()) {
-                IgniteTxManager tm = context().tm();
-
-                tm.rollbackTransactionsForCaches(cachesToStop);
-            }
-        }
-    }
-
-    /**
      * @param grpId Group ID.
      */
     private void stopCacheGroup(int grpId) {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
index 6fd4269..eee17de 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProxyImpl.java
@@ -104,7 +104,11 @@ public class GridCacheProxyImpl<K, V> implements 
IgniteInternalCache<K, V>, Exte
 
         gate = ctx.gate();
 
-        aff = new GridCacheAffinityProxy<>(ctx, ctx.cache().affinity());
+        GridCacheAdapter adapter = ctx.cache();
+        if (adapter == null)
+            throw new IllegalStateException(new 
CacheStoppedException(ctx.name()));
+
+        aff = new GridCacheAffinityProxy<>(ctx, adapter.affinity());
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
index 46bab86..1a7cfb0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java
@@ -42,6 +42,7 @@ import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import 
org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
+import 
org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -51,6 +52,7 @@ import org.apache.ignite.internal.util.typedef.C1;
 import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiInClosure;
@@ -969,6 +971,14 @@ public class GridNearOptimisticSerializableTxPrepareFuture 
extends GridNearOptim
          * @param res Response.
          */
         private void remap(final GridNearTxPrepareResponse res) {
+            if (parent.tx.isRollbackOnly()) {
+                onDone(new IgniteTxRollbackCheckedException(
+                    "Failed to prepare the transaction, due to the transaction 
is marked as rolled back " +
+                        "[tx=" + CU.txString(parent.tx) + ']'));
+
+                return;
+            }
+
             parent.prepareOnTopology(true, new Runnable() {
                 @Override public void run() {
                     onDone(res);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
index 140f593..791e018 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java
@@ -1013,6 +1013,14 @@ public class GridNearOptimisticTxPrepareFuture extends 
GridNearOptimisticTxPrepa
          *
          */
         private void remap() {
+            if (parent.tx.isRollbackOnly()) {
+                onDone(new IgniteTxRollbackCheckedException(
+                    "Failed to prepare the transaction, due to the transaction 
is marked as rolled back " +
+                        "[tx=" + CU.txString(parent.tx) + ']'));
+
+                return;
+            }
+
             parent.prepareOnTopology(true, new Runnable() {
                 @Override public void run() {
                     onDone((GridNearTxPrepareResponse) null);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
index 13b03fe..fb62880 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFutureAdapter.java
@@ -25,11 +25,13 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
+import 
org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.jetbrains.annotations.Nullable;
 
@@ -178,6 +180,14 @@ public abstract class 
GridNearOptimisticTxPrepareFutureAdapter extends GridNearT
                 return;
             }
 
+            if (tx.isRollbackOnly()) {
+                onDone(new IgniteTxRollbackCheckedException(
+                    "Failed to prepare the transaction, due to the transaction 
is marked as rolled back " +
+                        "[tx=" + CU.txString(tx) + ']'));
+
+                return;
+            }
+
             prepare0(remap, false);
 
             if (c != null)
@@ -189,6 +199,14 @@ public abstract class 
GridNearOptimisticTxPrepareFutureAdapter extends GridNearT
                     return;
 
                 try {
+                    if (tx.isRollbackOnly()) {
+                        onDone(new IgniteTxRollbackCheckedException(
+                            "Failed to prepare the transaction, due to the 
transaction is marked as rolled back " +
+                                "[tx=" + CU.txString(tx) + ']'));
+
+                        return;
+                    }
+
                     prepareOnTopology(remap, c);
                 }
                 finally {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 6ff8b77..d73dc39 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -79,6 +79,7 @@ import 
org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.cluster.BaselineTopology;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
 import 
org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException;
+import 
org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException;
 import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
 import org.apache.ignite.internal.util.GridBoundedConcurrentOrderedMap;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
@@ -327,35 +328,67 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
     }
 
     /**
-     * @param cachesToStop Caches to stop.
+     * @param cacheToStop Cache to stop.
      */
-    public void rollbackTransactionsForCaches(Set<Integer> cachesToStop) {
-        if (!cachesToStop.isEmpty()) {
-            IgniteTxManager tm = context().tm();
+    public void rollbackTransactionsForStoppingCache(int cacheToStop) {
+        GridCompoundFuture<IgniteInternalTx, IgniteInternalTx> compFut = new 
GridCompoundFuture<>();
 
-            Collection<IgniteInternalTx> active = tm.activeTransactions();
+        Collection<IgniteInternalTx> active = activeTransactions();
 
-            GridCompoundFuture<IgniteInternalTx, IgniteInternalTx> compFut = 
new GridCompoundFuture<>();
+        for (IgniteInternalTx tx : active) {
+            IgniteTxState state = tx.txState();
 
-            for (IgniteInternalTx tx : active) {
-                for (IgniteTxEntry e : tx.allEntries()) {
-                    if (cachesToStop.contains(e.context().cacheId())) {
-                        compFut.add(tx.rollbackAsync());
+            Collection<IgniteTxEntry> txEntries =
+                state instanceof IgniteTxStateImpl ? 
((IgniteTxStateImpl)state).allEntriesCopy() : state.allEntries();
 
-                        break;
-                    }
+            for (IgniteTxEntry e : txEntries) {
+                if (e.context().cacheId() == cacheToStop) {
+                    compFut.add(failTxOnPreparing(tx));
+
+                    break;
                 }
             }
+        }
 
-            compFut.markInitialized();
+        compFut.markInitialized();
 
-            try {
-                compFut.get();
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Error occured during tx rollback.", e);
-            }
+        try {
+            compFut.get();
         }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Error occurred during tx rollback.", e);
+        }
+    }
+
+    /**
+     * This method allows to roll back the transaction during partition map 
exchange related to destroying a cache(s).
+     * Semantically, this method is equivalent to two subsequent calls:
+     * <pre>
+     *     tx.rollbackAsync();
+     *     tx.currentPrepareFuture().onDone(new 
IgniteTxRollbackCheckedException())
+     * </pre>
+     *
+     * It is assumed that the given transaction did not acquired any locks.
+     *
+     * @param tx Transaction.
+     * @return Rollback future.
+     */
+    private IgniteInternalFuture<IgniteInternalTx> 
failTxOnPreparing(IgniteInternalTx tx) {
+        IgniteInternalFuture<IgniteInternalTx> rollbackFut = 
tx.rollbackAsync();
+
+        IgniteInternalFuture prepFut = tx.currentPrepareFuture();
+
+        if (prepFut != null) {
+            assert prepFut instanceof GridFutureAdapter :
+                "It is assumed that prepare future should extend 
GridFutureAdapter class [prepFut=" + prepFut + ']';
+
+            ((GridFutureAdapter)prepFut).onDone(
+                new IgniteTxRollbackCheckedException(
+                    "Failed to prepare the transaction, due to the transaction 
is marked as rolled back " +
+                        "[tx=" + CU.txString(tx) + ']'));
+        }
+
+        return rollbackFut;
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
index bba2e63..40299ed 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxStateImpl.java
@@ -24,7 +24,6 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
-
 import org.apache.ignite.IgniteCacheRestartingException;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
@@ -289,15 +288,18 @@ public class IgniteTxStateImpl extends 
IgniteTxLocalStateAdapter {
 
         GridCacheContext<?, ?> nonLocCtx = null;
 
+        Map<Integer, GridCacheContext> cacheCtxs = 
U.newHashMap(activeCacheIds.size());
+
         for (int i = 0; i < activeCacheIds.size(); i++) {
             int cacheId = activeCacheIds.get(i);
 
             GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId);
 
             if (!cacheCtx.isLocal()) {
-                nonLocCtx = cacheCtx;
+                if (nonLocCtx == null)
+                    nonLocCtx = cacheCtx;
 
-                break;
+                cacheCtxs.putIfAbsent(cacheCtx.cacheId(), cacheCtx);
             }
         }
 
@@ -306,13 +308,17 @@ public class IgniteTxStateImpl extends 
IgniteTxLocalStateAdapter {
 
         nonLocCtx.topology().readLock();
 
-        if (nonLocCtx.topology().stopping()) {
-            fut.onDone(
-                cctx.cache().isCacheRestarting(nonLocCtx.name())?
-                    new IgniteCacheRestartingException(nonLocCtx.name()):
-                    new CacheStoppedException(nonLocCtx.name()));
+        for (Map.Entry<Integer, GridCacheContext> e : cacheCtxs.entrySet()) {
+            GridCacheContext activeCacheCtx = e.getValue();
 
-            return null;
+            if (activeCacheCtx.topology().stopping()) {
+                fut.onDone(
+                    cctx.cache().isCacheRestarting(activeCacheCtx.name()) ?
+                        new 
IgniteCacheRestartingException(activeCacheCtx.name()) :
+                        new CacheStoppedException(activeCacheCtx.name()));
+
+                return null;
+            }
         }
 
         return nonLocCtx.topology().topologyVersionFuture();
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOnCachesStopTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOnCachesStopTest.java
index 52ec992..948d8c1 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOnCachesStopTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/TxOnCachesStopTest.java
@@ -17,9 +17,17 @@
 
 package org.apache.ignite.internal.processors.cache.transactions;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.cache.CacheException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.Ignition;
 import org.apache.ignite.cache.CacheAtomicityMode;
@@ -35,10 +43,14 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.processors.cache.CacheInvalidStateException;
 import org.apache.ignite.internal.processors.cache.CacheStoppedException;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareRequest;
 import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException;
 import org.apache.ignite.internal.util.GridRandom;
+import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteFutureTimeoutException;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.MvccFeatureChecker;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
@@ -46,8 +58,12 @@ import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
 import org.apache.ignite.transactions.TransactionRollbackException;
+import org.junit.Assume;
 import org.junit.Test;
 
+import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE;
+
 /**
  *
  */
@@ -67,6 +83,9 @@ public class TxOnCachesStopTest extends 
GridCommonAbstractTest {
     /** */
     private CacheConfiguration<Integer, byte[]> surviveCacheCfg;
 
+    /** */
+    private static final int CACHE_CNT = 30;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -101,7 +120,7 @@ public class TxOnCachesStopTest extends 
GridCommonAbstractTest {
 
         surviveCacheCfg = ccfg2;
 
-        cfg.setCacheConfiguration(ccfg1, ccfg2);
+        cfg.setCacheConfiguration(destroyCacheCfg, surviveCacheCfg);
 
         return cfg;
     }
@@ -132,7 +151,7 @@ public class TxOnCachesStopTest extends 
GridCommonAbstractTest {
      */
     @Test
     public void testTxOnCacheStopNoMessageBlock() throws Exception {
-        testTxOnCacheStop(false);
+        runTxOnCacheStop(false);
     }
 
     /**
@@ -140,13 +159,13 @@ public class TxOnCachesStopTest extends 
GridCommonAbstractTest {
      */
     @Test
     public void testTxOnCacheStopWithMessageBlock() throws Exception {
-        testTxOnCacheStop(true);
+        runTxOnCacheStop(true);
     }
 
     /**
      * @param block {@code True} To block GridNearTxPrepareRequest message.
      */
-    public void testTxOnCacheStop(boolean block) throws Exception {
+    private void runTxOnCacheStop(boolean block) throws Exception {
         startGridsMultiThreaded(2);
 
         Ignition.setClientMode(true);
@@ -155,9 +174,10 @@ public class TxOnCachesStopTest extends 
GridCommonAbstractTest {
 
         ig.cluster().active(true);
 
-        for (TransactionConcurrency conc : TransactionConcurrency.values())
+        for (TransactionConcurrency conc : TransactionConcurrency.values()) {
             for (TransactionIsolation iso : TransactionIsolation.values())
                 runTxOnCacheStop(conc, iso, ig, block);
+        }
     }
 
     /**
@@ -173,19 +193,91 @@ public class TxOnCachesStopTest extends 
GridCommonAbstractTest {
 
         ig.cluster().active(true);
 
-        for (TransactionConcurrency conc : TransactionConcurrency.values())
+        for (TransactionConcurrency conc : TransactionConcurrency.values()) {
             for (TransactionIsolation iso : TransactionIsolation.values())
                 runCacheStopInMidTx(conc, iso, ig);
+        }
     }
 
     /**
      * @throws Exception If failed.
      */
-    private void runTxOnCacheStop(TransactionConcurrency conc, 
TransactionIsolation iso, Ignite ig, boolean runConc)
-        throws Exception {
+    @Test
+    public void testOptimisticTxMappedOnPMETopology() throws Exception {
+        Assume.assumeFalse(MvccFeatureChecker.forcedMvcc());
+
+        startGridsMultiThreaded(1);
+
+        Ignition.setClientMode(true);
+
+        Ignite client = startGrid("client");
+
+        client.cluster().active(true);
+
+        awaitPartitionMapExchange(true, true, null);
+
+        final IgniteCache<Integer, byte[]> cache = 
client.getOrCreateCache(destroyCacheCfg);
+        final IgniteCache<Integer, byte[]> cache2 = 
client.getOrCreateCache(surviveCacheCfg);
+
+        final TestRecordingCommunicationSpi srvSpi = 
TestRecordingCommunicationSpi.spi(grid(0));
+
+        CountDownLatch destroyLatch = new CountDownLatch(1);
+
+        srvSpi.blockMessages((node, msg) -> (msg instanceof 
GridDhtPartitionsFullMessage));
+
+        try (Transaction tx = client.transactions().txStart(OPTIMISTIC, 
SERIALIZABLE)) {
+            cache2.put(100, new byte[1024]);
+            cache.put(100, new byte[1024]);
+
+            GridTestUtils.runAsync(() -> {
+                grid(0).destroyCache(destroyCacheCfg.getName());
+
+                destroyLatch.countDown();
+            });
+
+            destroyLatch.await();
+
+            IgniteFuture commitFut = tx.commitAsync();
+
+            srvSpi.stopBlock();
+
+            commitFut.get(10_000);
+
+            fail("Transaction should be rolled back.");
+        }
+        catch (IgniteFutureTimeoutException fte) {
+            srvSpi.stopBlock();
+
+            fail("Partition map exchange hangs [err=" + fte + ']');
+        }
+        catch (IgniteException e) {
+            srvSpi.stopBlock();
+
+            assertTrue(X.hasCause(e, CacheInvalidStateException.class) || 
X.hasCause(e, IgniteException.class));
+        }
+    }
+
+    /**
+     * @param conc Concurrency mode.
+     * @param iso Isolation level.
+     * @param ig Client node.
+     * @param runConc {@code true} if a cache should be destroyed concurrently.
+     * @throws Exception If Failed.
+     */
+    private void runTxOnCacheStop(
+        TransactionConcurrency conc,
+        TransactionIsolation iso,
+        Ignite ig,
+        boolean runConc
+    ) throws Exception {
         if ((conc == TransactionConcurrency.OPTIMISTIC) && 
(MvccFeatureChecker.forcedMvcc()))
             return;
 
+        if (log.isInfoEnabled()) {
+            log.info("Starting runTxOnCacheStop " +
+                "[concurrency=" + conc + ", isolation=" + iso + ", 
blockPrepareRequests=" + !runConc + ']');
+        }
+
         CountDownLatch destroyLatch = new CountDownLatch(1);
 
         final IgniteCache<Integer, byte[]> cache = 
ig.getOrCreateCache(destroyCacheCfg);
@@ -255,10 +347,188 @@ public class TxOnCachesStopTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
+    @Test
+    public void testOptimisticTransactionsOnCacheDestroy() throws Exception {
+        Assume.assumeFalse(MvccFeatureChecker.forcedMvcc());
+
+        startGridsMultiThreaded(3);
+
+        Ignition.setClientMode(true);
+
+        ArrayList<Ignite> clients = new ArrayList<>();
+        for (int ci = 0; ci < 2; ++ci)
+            clients.add(startGrid("client-" + ci));
+
+        clients.get(0).cluster().active(true);
+
+        for (TransactionIsolation iso : TransactionIsolation.values()) {
+            grid(0).getOrCreateCaches(createCacheConfigurations());
+
+            // Make sure that all caches are started.
+            awaitPartitionMapExchange();
+
+            testConcurrentTransactionsOnCacheDestroy(clients, OPTIMISTIC, iso);
+
+            // Make sure that all caches are stopped.
+            awaitPartitionMapExchange();
+        }
+    }
+
+    /**
+     * Creates a list of cache configurations.
+     *
+     * @return List of cache configurations.
+     */
+    private List<CacheConfiguration> createCacheConfigurations() {
+        String GRP_NAME = "test-destroy-group";
+
+        List<CacheConfiguration> cacheCfgs = new ArrayList<>(CACHE_CNT);
+
+        for (int i = 0; i < CACHE_CNT; ++i) {
+            CacheConfiguration<Integer, byte[]> c = new 
CacheConfiguration<>("test-cache-" + i);
+            c.setBackups(2);
+            c.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+            
c.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+            c.setAffinity(new RendezvousAffinityFunction(false, 32));
+            c.setGroupName(GRP_NAME);
+
+            cacheCfgs.add(c);
+        }
+
+        return cacheCfgs;
+    }
+
+    /**
+     * @param clients Client nodes that are used for initiating transactions.
+     * @param conc Transaction concurrency mode.
+     * @param iso Transaction isolation.
+     * @throws Exception If failed.
+     */
+    private void testConcurrentTransactionsOnCacheDestroy(
+        final ArrayList<Ignite> clients,
+        TransactionConcurrency conc,
+        TransactionIsolation iso
+    ) throws Exception {
+        if (log.isInfoEnabled()) {
+            log.info("Starting testConcurrentTransactionsOnCacheDestroy " +
+                "[concurrency=" + conc + ", isolation=" + iso + ']');
+        }
+
+        final AtomicBoolean stopTxLoad = new AtomicBoolean();
+        final AtomicInteger cacheIdxToBeDestroyed = new AtomicInteger(-1);
+
+        IgniteInternalFuture txLoadFut = startTxLoad(stopTxLoad, 
cacheIdxToBeDestroyed, clients, conc, iso);
+
+        try {
+            for (int i = 0; i < CACHE_CNT; ++i) {
+                int clientIdx = (i % clients.size());
+
+                IgniteInternalFuture destFut = GridTestUtils.runAsync(() ->
+                    clients.get(clientIdx).destroyCache("test-cache-" + 
cacheIdxToBeDestroyed.incrementAndGet())
+                );
+
+                try {
+                    destFut.get(15, TimeUnit.SECONDS);
+                }
+                catch (IgniteCheckedException e) {
+                    fail("Looks like PME hangs [err=" + e + ']');
+                }
+            }
+        }
+        catch (Throwable t) {
+            fail("Unexpected error [err=" + t + ']');
+        }
+
+        stopTxLoad.set(true);
+
+        txLoadFut.get();
+    }
+
+    /**
+     * Starts transactional load.
+     *
+     * @param stopTxLoad Boolean flag that is used to stop transactional load.
+     * @param cacheIdxToBeDestroyed Variable that allows to get an index of 
destroyed cache.
+     * @param clients Client nodes that are used for initiating transactions.
+     * @param concurrency Transaction concurrency mode.
+     * @param isolation Transaction isolation.
+     * @return TxLoad future.
+     */
+    private IgniteInternalFuture startTxLoad (
+        final AtomicBoolean stopTxLoad,
+        final AtomicInteger cacheIdxToBeDestroyed,
+        final List<Ignite> clients,
+        TransactionConcurrency concurrency,
+        TransactionIsolation isolation){
+        final GridCompoundFuture fut = new GridCompoundFuture();
+
+        for (Ignite c : clients) {
+            for (int i = 0; i < CACHE_CNT; ++i)
+                c.getOrCreateCache("test-cache-" + i);
+        }
+
+        clients.forEach(c -> {
+            fut.add(GridTestUtils.runAsync(() -> {
+                ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                List<IgniteCache<Integer, byte[]>> caches = new ArrayList<>();
+
+                for (int i = 0; i < CACHE_CNT; ++i) {
+                    IgniteCache<Integer, byte[]> testCache = 
c.cache("test-cache-" + i);
+
+                    if (testCache == null) {
+                        throw new IllegalStateException(
+                            "Cache test-cache-" + i + " is not started " +
+                                "on client node " + 
c.configuration().getIgniteInstanceName());
+                    }
+
+                    caches.add(testCache);
+                }
+
+                byte[] val = new byte[128];
+
+                while (!stopTxLoad.get()) {
+                    try (Transaction tx = 
c.transactions().txStart(concurrency, isolation)) {
+                        int cacheIdx = cacheIdxToBeDestroyed.get();
+
+                        caches.get(Math.max(0, cacheIdx)).put(rnd.nextInt(), 
val);
+                        caches.get(rnd.nextInt(Math.min(cacheIdx + 1, 
caches.size() - 1), caches.size())).put(rnd.nextInt(), val);
+
+                        doSleep(200);
+
+                        tx.commit();
+                    }
+                    // Expected exceptions:
+                    catch (TransactionRollbackException | CacheException e) {
+                        // Failed to prepare the transaction (transaction is 
marked as rolled back).
+                        if (!X.hasCause(e, TransactionRollbackException.class))
+                            throw e;
+                    }
+                    catch (IgniteException | IllegalStateException  e) {
+                        // Failed to perform cache operation (cache is 
stopped).
+                    }
+                }
+            }, "tx-load-" + c.configuration().getIgniteInstanceName()));
+        });
+
+        fut.markInitialized();
+
+        return fut;
+    }
+
+    /**
+     * @param conc Concurrency mode.
+     * @param iso Isolation level.
+     * @param ig Client node.
+     * @throws Exception If failed.
+     */
     private void runCacheStopInMidTx(TransactionConcurrency conc, 
TransactionIsolation iso, Ignite ig) throws Exception {
         if ((conc == TransactionConcurrency.OPTIMISTIC) && 
(MvccFeatureChecker.forcedMvcc()))
             return;
 
+        if (log.isInfoEnabled())
+            log.info("Starting runCacheStopInMidTx [concurrency=" + conc + ", 
isolation=" + iso + ']');
+
         CountDownLatch destroyLatch = new CountDownLatch(1);
 
         CountDownLatch putLatch = new CountDownLatch(1);
@@ -303,7 +573,7 @@ public class TxOnCachesStopTest extends 
GridCommonAbstractTest {
                 e.printStackTrace();
             }
 
-        });
+        }, "tx-load-thread");
 
         f1.get();
         f0.get();

Reply via email to