This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch ignite-2.8.1
in repository https://gitbox.apache.org/repos/asf/ignite.git

commit 2158e77c5995d995e030cc05642456bfffbe99c3
Author: Aleksey Plekhanov <plehanov.a...@gmail.com>
AuthorDate: Thu Apr 30 11:16:29 2020 +0300

    IGNITE-12855 Fixed failure with concurrent get operation and entry 
expiration - Fixes #7609.
    
    Signed-off-by: Aleksey Plekhanov <plehanov.a...@gmail.com>
    (cherry picked from commit ed1e06fbc54a9f2d8055e59d805fdd618630e876)
---
 .../processors/cache/GridCacheAdapter.java         |   5 +
 .../cache/distributed/dht/GridDhtCacheAdapter.java |   2 +-
 .../distributed/dht/atomic/GridDhtAtomicCache.java |   5 +
 .../dht/colocated/GridDhtColocatedCache.java       |   5 +
 .../cache/local/atomic/GridLocalAtomicCache.java   | 225 +++++++++++----------
 .../cache/persistence/GridCacheOffheapManager.java |  20 +-
 .../cache/transactions/IgniteTxManager.java        |  16 +-
 .../IgnitePdsWithTtlDeactivateOnHighloadTest.java  |  96 ++++++---
 8 files changed, 218 insertions(+), 156 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 096ee61..7acc554 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -2055,6 +2055,8 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
             final AffinityTopologyVersion topVer = tx == null ? 
ctx.affinity().affinityTopologyVersion() :
                 tx.topologyVersion();
 
+            ctx.shared().database().checkpointReadLock();
+
             try {
                 int keysSize = keys.size();
 
@@ -2377,6 +2379,9 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
             catch (IgniteCheckedException e) {
                 return new GridFinishedFuture<>(e);
             }
+            finally {
+                ctx.shared().database().checkpointReadUnlock();
+            }
         }
         else {
             return asyncOp(tx, new AsyncOp<Map<K1, V1>>(keys) {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 41131f8..7969e69 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -1108,7 +1108,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
      * @param expiryPlc Expiry policy.
      */
     public void sendTtlUpdateRequest(@Nullable final IgniteCacheExpiryPolicy 
expiryPlc) {
-        if (expiryPlc != null && expiryPlc.entries() != null) {
+        if (expiryPlc != null && !F.isEmpty(expiryPlc.entries())) {
             ctx.closures().runLocalSafe(new Runnable() {
                 @SuppressWarnings({"ForLoopReplaceableByForEach"})
                 @Override public void run() {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 9f8271a..56217ee 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -1508,6 +1508,8 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
         // Optimisation: try to resolve value locally and escape 'get future' 
creation.
         if (!forcePrimary && ctx.affinityNode()) {
+            ctx.shared().database().checkpointReadLock();
+
             try {
                 Map<K, V> locVals = U.newHashMap(keys.size());
 
@@ -1656,6 +1658,9 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
             catch (IgniteCheckedException e) {
                 return new GridFinishedFuture<>(e);
             }
+            finally {
+                ctx.shared().database().checkpointReadUnlock();
+            }
         }
 
         if (expiry != null)
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 63a0582..4ef8d65 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -526,6 +526,8 @@ public class GridDhtColocatedCache<K, V> extends 
GridDhtTransactionalCacheAdapte
 
         // Optimization: try to resolve value locally and escape 'get future' 
creation.
         if (!forcePrimary && ctx.affinityNode()) {
+            ctx.shared().database().checkpointReadLock();
+
             try {
                 Map<K, V> locVals = null;
 
@@ -684,6 +686,9 @@ public class GridDhtColocatedCache<K, V> extends 
GridDhtTransactionalCacheAdapte
             catch (IgniteCheckedException e) {
                 return new GridFinishedFuture<>(e);
             }
+            finally {
+                ctx.shared().database().checkpointReadUnlock();
+            }
         }
 
         if (expiryPlc != null)
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index c62b1df..6847c69 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -393,139 +393,146 @@ public class GridLocalAtomicCache<K, V> extends 
GridLocalCache<K, V> {
         boolean readNoEntry = ctx.readNoEntry(expiry, false);
         final boolean evt = !skipVals;
 
-        for (K key : keys) {
-            if (key == null)
-                throw new NullPointerException("Null key.");
-
-            KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
+        ctx.shared().database().checkpointReadLock();
 
-            boolean skipEntry = readNoEntry;
+        try {
+            for (K key : keys) {
+                if (key == null)
+                    throw new NullPointerException("Null key.");
 
-            if (readNoEntry) {
-                CacheDataRow row = ctx.offheap().read(ctx, cacheKey);
+                KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
 
-                if (row != null) {
-                    long expireTime = row.expireTime();
+                boolean skipEntry = readNoEntry;
 
-                    if (expireTime == 0 || expireTime > U.currentTimeMillis()) 
{
-                        ctx.addResult(vals,
-                            cacheKey,
-                            row.value(),
-                            skipVals,
-                            false,
-                            deserializeBinary,
-                            true,
-                            null,
-                            row.version(),
-                            0,
-                            0,
-                            needVer);
+                if (readNoEntry) {
+                    CacheDataRow row = ctx.offheap().read(ctx, cacheKey);
 
-                        if (ctx.statisticsEnabled() && !skipVals)
-                            metrics0().onRead(true);
+                    if (row != null) {
+                        long expireTime = row.expireTime();
 
-                        if (evt) {
-                            ctx.events().readEvent(cacheKey,
-                                null,
-                                null,
+                        if (expireTime == 0 || expireTime > 
U.currentTimeMillis()) {
+                            ctx.addResult(vals,
+                                cacheKey,
                                 row.value(),
-                                subjId,
-                                taskName,
-                                !deserializeBinary);
+                                skipVals,
+                                false,
+                                deserializeBinary,
+                                true,
+                                null,
+                                row.version(),
+                                0,
+                                0,
+                                needVer);
+
+                            if (ctx.statisticsEnabled() && !skipVals)
+                                metrics0().onRead(true);
+
+                            if (evt) {
+                                ctx.events().readEvent(cacheKey,
+                                    null,
+                                    null,
+                                    row.value(),
+                                    subjId,
+                                    taskName,
+                                    !deserializeBinary);
+                            }
                         }
+                        else
+                            skipEntry = false;
                     }
                     else
-                        skipEntry = false;
+                        success = false;
                 }
-                else
-                    success = false;
-            }
 
-            if (!skipEntry) {
-                GridCacheEntryEx entry = null;
+                if (!skipEntry) {
+                    GridCacheEntryEx entry = null;
 
-                while (true) {
-                    try {
-                        entry = entryEx(cacheKey);
+                    while (true) {
+                        try {
+                            entry = entryEx(cacheKey);
 
-                        if (entry != null) {
-                            CacheObject v;
+                            if (entry != null) {
+                                CacheObject v;
 
-                            if (needVer) {
-                                EntryGetResult res = entry.innerGetVersioned(
-                                    null,
-                                    null,
-                                    /*update-metrics*/false,
-                                    /*event*/evt,
-                                    subjId,
-                                    null,
-                                    taskName,
-                                    expiry,
-                                    !deserializeBinary,
-                                    null);
-
-                                if (res != null) {
-                                    ctx.addResult(
-                                        vals,
-                                        cacheKey,
-                                        res,
-                                        skipVals,
-                                        false,
-                                        deserializeBinary,
-                                        true,
-                                        needVer);
+                                if (needVer) {
+                                    EntryGetResult res = 
entry.innerGetVersioned(
+                                        null,
+                                        null,
+                                        /*update-metrics*/false,
+                                        /*event*/evt,
+                                        subjId,
+                                        null,
+                                        taskName,
+                                        expiry,
+                                        !deserializeBinary,
+                                        null);
+
+                                    if (res != null) {
+                                        ctx.addResult(
+                                            vals,
+                                            cacheKey,
+                                            res,
+                                            skipVals,
+                                            false,
+                                            deserializeBinary,
+                                            true,
+                                            needVer);
+                                    }
+                                    else
+                                        success = false;
                                 }
-                                else
-                                    success = false;
-                            }
-                            else {
-                                v = entry.innerGet(
-                                    null,
-                                    null,
-                                    /*read-through*/false,
-                                    /*update-metrics*/true,
-                                    /*event*/evt,
-                                    subjId,
-                                    null,
-                                    taskName,
-                                    expiry,
-                                    !deserializeBinary);
-
-                                if (v != null) {
-                                    ctx.addResult(vals,
-                                        cacheKey,
-                                        v,
-                                        skipVals,
-                                        false,
-                                        deserializeBinary,
-                                        true,
+                                else {
+                                    v = entry.innerGet(
+                                        null,
                                         null,
-                                        0,
-                                        0);
+                                        /*read-through*/false,
+                                        /*update-metrics*/true,
+                                        /*event*/evt,
+                                        subjId,
+                                        null,
+                                        taskName,
+                                        expiry,
+                                        !deserializeBinary);
+
+                                    if (v != null) {
+                                        ctx.addResult(vals,
+                                            cacheKey,
+                                            v,
+                                            skipVals,
+                                            false,
+                                            deserializeBinary,
+                                            true,
+                                            null,
+                                            0,
+                                            0);
+                                    }
+                                    else
+                                        success = false;
                                 }
-                                else
-                                    success = false;
                             }
+
+                            break; // While.
+                        }
+                        catch (GridCacheEntryRemovedException ignored) {
+                            // No-op, retry.
+                        }
+                        finally {
+                            if (entry != null)
+                                entry.touch();
                         }
 
-                        break; // While.
-                    }
-                    catch (GridCacheEntryRemovedException ignored) {
-                        // No-op, retry.
-                    }
-                    finally {
-                        if (entry != null)
-                            entry.touch();
+                        if (!success && storeEnabled)
+                            break;
                     }
-
-                    if (!success && storeEnabled)
-                        break;
+                }
+                if (!success) {
+                    if (!storeEnabled && ctx.statisticsEnabled() && !skipVals)
+                        metrics0().onRead(false);
                 }
             }
-            if (!success) {
-                if (!storeEnabled && ctx.statisticsEnabled() && !skipVals)
-                    metrics0().onRead(false);
-            }
+        }
+        finally {
+            ctx.shared().database().checkpointReadUnlock();
         }
 
         if (success || !storeEnabled)
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index a255413..f109e7e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -2695,19 +2695,24 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
             IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c,
             int amount
         ) throws IgniteCheckedException {
-            GridDhtLocalPartition part = 
cctx.topology().localPartition(partId, AffinityTopologyVersion.NONE, false, 
false);
+            GridDhtLocalPartition part = null;
 
-            // Skip non-owned partitions.
-            if (part == null || part.state() != OWNING)
-                return 0;
+            if (!grp.isLocal()) {
+                part = cctx.topology().localPartition(partId, 
AffinityTopologyVersion.NONE, false, false);
+
+                // Skip non-owned partitions.
+                if (part == null || part.state() != OWNING)
+                    return 0;
+            }
 
             cctx.shared().database().checkpointReadLock();
+
             try {
-                if (!part.reserve())
+                if (part != null && !part.reserve())
                     return 0;
 
                 try {
-                    if (part.state() != OWNING)
+                    if (part != null && part.state() != OWNING)
                         return 0;
 
                     long now = U.currentTimeMillis();
@@ -2753,7 +2758,8 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
                     return cleared;
                 }
                 finally {
-                    part.release();
+                    if (part != null)
+                        part.release();
                 }
             }
             finally {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index 8cf31ed..cedb3d1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -1146,10 +1146,13 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
     private void removeObsolete(IgniteInternalTx tx) {
         Collection<IgniteTxEntry> entries = tx.local() ? tx.allEntries() : 
tx.writeEntries();
 
-        for (IgniteTxEntry entry : entries) {
-            cctx.database().checkpointReadLock();
+        if (F.isEmpty(entries))
+            return;
 
-            try {
+        cctx.database().checkpointReadLock();
+
+        try {
+            for (IgniteTxEntry entry : entries) {
                 GridCacheEntryEx cached = entry.cached();
 
                 GridCacheContext cacheCtx = entry.context();
@@ -1177,10 +1180,11 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
                     U.error(log, "Failed to remove obsolete entry from cache: 
" + cached, e);
                 }
             }
-            finally {
-                cctx.database().checkpointReadUnlock();
-            }
         }
+        finally {
+            cctx.database().checkpointReadUnlock();
+        }
+
     }
 
     /**
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlDeactivateOnHighloadTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlDeactivateOnHighloadTest.java
index 5e58e5b..6809f70 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlDeactivateOnHighloadTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsWithTtlDeactivateOnHighloadTest.java
@@ -17,16 +17,19 @@ package 
org.apache.ignite.internal.processors.cache.persistence.db;
  * limitations under the License.
  */
 
-import javax.cache.expiry.AccessedExpiryPolicy;
-import javax.cache.expiry.Duration;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import javax.cache.expiry.AccessedExpiryPolicy;
+import javax.cache.expiry.Duration;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CacheRebalanceMode;
 import org.apache.ignite.cache.CacheWriteSynchronizationMode;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
@@ -34,6 +37,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
 import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.failure.FailureContext;
 import org.apache.ignite.failure.FailureHandler;
@@ -42,6 +46,7 @@ import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.MvccFeatureChecker;
@@ -49,6 +54,8 @@ import 
org.apache.ignite.testframework.junits.WithSystemProperty;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
 
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.testframework.GridTestUtils.runAsync;
 import static 
org.apache.ignite.testframework.GridTestUtils.runMultiThreadedAsync;
 import static org.apache.ignite.testframework.GridTestUtils.waitForAllFutures;
@@ -59,10 +66,22 @@ import static 
org.apache.ignite.testframework.GridTestUtils.waitForAllFutures;
 @WithSystemProperty(key = 
IgniteSystemProperties.IGNITE_UNWIND_THROTTLING_TIMEOUT, value = "5")
 public class IgnitePdsWithTtlDeactivateOnHighloadTest extends 
GridCommonAbstractTest {
     /** */
-    private static final String CACHE_NAME = "expirable-cache";
+    private static final String CACHE_NAME_ATOMIC = "expirable-cache-atomic";
+
+    /** */
+    private static final String CACHE_NAME_TX = "expirable-cache-tx";
 
     /** */
-    private static final String GROUP_NAME = "group1";
+    private static final String CACHE_NAME_LOCAL_ATOMIC = 
"expirable-cache-local-atomic";
+
+    /** */
+    private static final String CACHE_NAME_LOCAL_TX = 
"expirable-cache-local-tx";
+
+    /** */
+    private static final String CACHE_NAME_NEAR_ATOMIC = 
"expirable-cache-near-atomic";
+
+    /** */
+    private static final String CACHE_NAME_NEAR_TX = "expirable-cache-near-tx";
 
     /** */
     private static final int PART_SIZE = 2;
@@ -74,7 +93,7 @@ public class IgnitePdsWithTtlDeactivateOnHighloadTest extends 
GridCommonAbstract
     private static final int ENTRIES = 10;
 
     /** */
-    private static final int WORKLOAD_THREADS_CNT = 8;
+    private static final int WORKLOAD_THREADS_CNT = 16;
 
     /** Fail. */
     private volatile boolean fail;
@@ -111,7 +130,16 @@ public class IgnitePdsWithTtlDeactivateOnHighloadTest 
extends GridCommonAbstract
                         .setPersistenceEnabled(true)
                 ).setWalMode(WALMode.LOG_ONLY));
 
-        cfg.setCacheConfiguration(getCacheConfiguration(CACHE_NAME));
+        cfg.setCacheConfiguration(
+            getCacheConfiguration(CACHE_NAME_ATOMIC).setAtomicityMode(ATOMIC),
+            
getCacheConfiguration(CACHE_NAME_TX).setAtomicityMode(TRANSACTIONAL),
+            
getCacheConfiguration(CACHE_NAME_LOCAL_ATOMIC).setAtomicityMode(ATOMIC).setCacheMode(CacheMode.LOCAL),
+            
getCacheConfiguration(CACHE_NAME_LOCAL_TX).setAtomicityMode(TRANSACTIONAL).setCacheMode(CacheMode.LOCAL),
+            
getCacheConfiguration(CACHE_NAME_NEAR_ATOMIC).setAtomicityMode(ATOMIC)
+                .setNearConfiguration(new NearCacheConfiguration<>()),
+            
getCacheConfiguration(CACHE_NAME_NEAR_TX).setAtomicityMode(TRANSACTIONAL)
+                .setNearConfiguration(new NearCacheConfiguration<>())
+        );
 
         return cfg;
     }
@@ -133,11 +161,10 @@ public class IgnitePdsWithTtlDeactivateOnHighloadTest 
extends GridCommonAbstract
      * @param name Cache name.
      * @return Cache configuration.
      */
-    private CacheConfiguration getCacheConfiguration(String name) {
-        CacheConfiguration ccfg = new CacheConfiguration();
+    private CacheConfiguration<?, ?> getCacheConfiguration(String name) {
+        CacheConfiguration<?, ?> ccfg = new CacheConfiguration<>();
 
         ccfg.setName(name);
-        ccfg.setGroupName(GROUP_NAME);
         ccfg.setAffinity(new RendezvousAffinityFunction(false, PART_SIZE));
         ccfg.setExpiryPolicyFactory(AccessedExpiryPolicy.factoryOf(new 
Duration(TimeUnit.MILLISECONDS, EXPIRATION_TIMEOUT)));
         ccfg.setEagerTtl(true);
@@ -151,20 +178,21 @@ public class IgnitePdsWithTtlDeactivateOnHighloadTest 
extends GridCommonAbstract
      * @throws Exception if failed.
      */
     @Test
+    @SuppressWarnings("LockAcquiredButNotSafelyReleased")
     public void 
shouldNotBeProblemToPutToExpiredCacheConcurrentlyWithCheckpoint() throws 
Exception {
         IgniteEx ig0 = startGrid(0);
 
         ig0.cluster().active(true);
 
-        IgniteCache<Object, Object> cache = ig0.getOrCreateCache(CACHE_NAME);
+        IgniteCache<Object, Object> cache = 
ig0.getOrCreateCache(CACHE_NAME_ATOMIC);
 
         AtomicBoolean timeoutReached = new AtomicBoolean(false);
 
         GridCacheDatabaseSharedManager db = 
(GridCacheDatabaseSharedManager)ig0.context().cache().context().database();
 
-        IgniteInternalFuture ldrFut = runMultiThreadedAsync(() -> {
+        IgniteInternalFuture<?> ldrFut = runMultiThreadedAsync(() -> {
             while (!timeoutReached.get()) {
-                Map map = new TreeMap();
+                Map<Object, Object> map = new TreeMap<>();
 
                 for (int i = 0; i < ENTRIES; i++)
                     map.put(i, i);
@@ -173,14 +201,14 @@ public class IgnitePdsWithTtlDeactivateOnHighloadTest 
extends GridCommonAbstract
             }
         }, WORKLOAD_THREADS_CNT, "loader");
 
-        IgniteInternalFuture updaterFut = runMultiThreadedAsync(() -> {
+        IgniteInternalFuture<?> updaterFut = runMultiThreadedAsync(() -> {
             while (!timeoutReached.get()) {
                 for (int i = 0; i < ENTRIES; i++)
                     cache.put(i, i * 10);
             }
         }, WORKLOAD_THREADS_CNT, "updater");
 
-        IgniteInternalFuture cpWriteLockUnlockFut = runAsync(() -> {
+        IgniteInternalFuture<?> cpWriteLockUnlockFut = runAsync(() -> {
             ReentrantReadWriteLock lock = U.field(db, "checkpointLock");
 
             while (!timeoutReached.get()) {
@@ -214,21 +242,33 @@ public class IgnitePdsWithTtlDeactivateOnHighloadTest 
extends GridCommonAbstract
     public void shouldNotBeProblemToPutToExpiredCacheConcurrently() throws 
Exception {
         final AtomicBoolean end = new AtomicBoolean();
 
-        final IgniteEx srv = startGrid(0);
+        final IgniteEx srv = startGrids(3);
 
         srv.cluster().active(true);
 
-        // Start high workload
-        IgniteInternalFuture loadFut = runMultiThreadedAsync(() -> {
+        // Start high workload.
+        IgniteInternalFuture<?> loadFut = runMultiThreadedAsync(() -> {
+            List<IgniteCache<Object, Object>> caches = F.asList(
+                srv.cache(CACHE_NAME_ATOMIC),
+                srv.cache(CACHE_NAME_TX),
+                srv.cache(CACHE_NAME_LOCAL_ATOMIC),
+                srv.cache(CACHE_NAME_LOCAL_TX),
+                srv.cache(CACHE_NAME_NEAR_ATOMIC),
+                srv.cache(CACHE_NAME_NEAR_TX)
+            );
+
             while (!end.get() && !fail) {
-                IgniteCache<Integer, byte[]> cache = srv.cache(CACHE_NAME);
+                for (IgniteCache<Object, Object> cache : caches) {
+                    for (int i = 0; i < ENTRIES; i++)
+                        cache.put(i, new byte[1024]);
 
-                for (int i = 0; i < ENTRIES; i++)
-                    cache.put(i, new byte[1024]);
+                    cache.putAll(new TreeMap<>(F.asMap(0, new byte[1024], 1, 
new byte[1024])));
 
-                //Touch entries.
-                for (int i = 0; i < ENTRIES; i++)
-                    cache.get(i); // touch entries
+                    for (int i = 0; i < ENTRIES; i++)
+                        cache.get(i);
+
+                    cache.getAll(new TreeSet<>(F.asList(0, 1)));
+                }
             }
         }, WORKLOAD_THREADS_CNT, "high-workload");
 
@@ -247,14 +287,4 @@ public class IgnitePdsWithTtlDeactivateOnHighloadTest 
extends GridCommonAbstract
 
         assertFalse("Failure handler was called. See log above.", fail);
     }
-
-    /** */
-    protected void fillCache(IgniteCache<Integer, byte[]> cache) {
-        for (int i = 0; i < ENTRIES; i++)
-            cache.put(i, new byte[1024]);
-
-        //Touch entries.
-        for (int i = 0; i < ENTRIES; i++)
-            cache.get(i); // touch entries
-    }
 }

Reply via email to