This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new b9644393603 IGNITE-22069 Optimize persistent entries expiration - 
Fixes #11319.
b9644393603 is described below

commit b96443936038a0fece129a6112ece79964473f83
Author: Aleksey Plekhanov <[email protected]>
AuthorDate: Sat Apr 27 10:43:53 2024 +0300

    IGNITE-22069 Optimize persistent entries expiration - Fixes #11319.
    
    Signed-off-by: Aleksey Plekhanov <[email protected]>
---
 .../jmh/cache/JmhCacheExpireBenchmark.java         |  16 ++-
 .../internal/processors/cache/GridCacheUtils.java  |  10 --
 .../cache/IgniteCacheOffheapManagerImpl.java       |  48 +++++----
 .../cache/persistence/GridCacheOffheapManager.java |  73 +++++++------
 .../cache/persistence/tree/BPlusTree.java          |  37 +++++--
 .../cache/transactions/TransactionProxyImpl.java   |  13 ++-
 .../processors/cache/tree/PendingEntriesTree.java  |  12 +++
 .../cache/GridCacheTtlManagerSelfTest.java         |  27 +++++
 .../expiry/IgniteCacheExpiryPolicyTestSuite.java   |   3 +-
 .../cache/expiry/PendingTreeCleaningTest.java      | 118 +++++++++++++++++++++
 10 files changed, 281 insertions(+), 76 deletions(-)

diff --git 
a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/cache/JmhCacheExpireBenchmark.java
 
b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/cache/JmhCacheExpireBenchmark.java
index 4c25bbc12b4..79b9bf12a78 100644
--- 
a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/cache/JmhCacheExpireBenchmark.java
+++ 
b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/cache/JmhCacheExpireBenchmark.java
@@ -24,7 +24,10 @@ import javax.cache.expiry.Duration;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.Ignition;
+import org.apache.ignite.cluster.ClusterState;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -33,6 +36,7 @@ import org.openjdk.jmh.annotations.Level;
 import org.openjdk.jmh.annotations.Measurement;
 import org.openjdk.jmh.annotations.Mode;
 import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
 import org.openjdk.jmh.annotations.Scope;
 import org.openjdk.jmh.annotations.Setup;
 import org.openjdk.jmh.annotations.State;
@@ -66,6 +70,10 @@ public class JmhCacheExpireBenchmark {
     /** Cache with expire policy. */
     private IgniteCache<Integer, Integer> cacheExp;
 
+    /** Persistence enabled. */
+    @Param({"FALSE", "TRUE"})
+    private String persistence;
+
     /** */
     @Benchmark
     public void putWithExpire() {
@@ -87,7 +95,13 @@ public class JmhCacheExpireBenchmark {
      */
     @Setup(Level.Trial)
     public void setup() {
-        ignite = Ignition.start(new 
IgniteConfiguration().setIgniteInstanceName("test"));
+        ignite = Ignition.start(new 
IgniteConfiguration().setIgniteInstanceName("test")
+            .setDataStorageConfiguration(new 
DataStorageConfiguration().setDefaultDataRegionConfiguration(
+                new 
DataRegionConfiguration().setPersistenceEnabled(Boolean.parseBoolean(persistence))
+            ))
+        );
+
+        ignite.cluster().state(ClusterState.ACTIVE);
 
         cacheReg = ignite.getOrCreateCache(new 
CacheConfiguration<>("CACHE_REG"));
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index dd87b339ed3..0a12ac3959c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -860,16 +860,6 @@ public class GridCacheUtils {
         ctx.ttl().expire(TTL_BATCH_SIZE);
     }
 
-    /**
-     * @param ctx Shared cache context.
-     */
-    public static <K, V> void unwindEvicts(GridCacheSharedContext<K, V> ctx) {
-        assert ctx != null;
-
-        for (GridCacheContext<K, V> cacheCtx : ctx.cacheContexts())
-            unwindEvicts(cacheCtx);
-    }
-
     /**
      * @param asc {@code True} for ascending.
      * @return Descending order comparator.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index c1f23ebe791..edfe8d0bc75 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -271,13 +271,7 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
                 if (pendingEntries != null) {
                     PendingRow row = new PendingRow(cacheId);
 
-                    GridCursor<PendingRow> cursor = pendingEntries.find(row, 
row, PendingEntriesTree.WITHOUT_KEY);
-
-                    while (cursor.next()) {
-                        boolean res = pendingEntries.removex(cursor.get());
-
-                        assert res;
-                    }
+                    while (pendingEntries.removex(row, row, -1));
                 }
             }
         }
@@ -1135,26 +1129,36 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
                 return 0;
 
             try {
-                List<PendingRow> rows = pendingEntries.remove(
-                    new PendingRow(cacheId, Long.MIN_VALUE, 0), new 
PendingRow(cacheId, U.currentTimeMillis(), 0), amount);
+                int cleared = 0;
+
+                do {
+                    List<PendingRow> rows = pendingEntries.remove(new 
PendingRow(cacheId, Long.MIN_VALUE, 0),
+                        new PendingRow(cacheId, U.currentTimeMillis(), 0), 
amount - cleared);
+
+                    if (rows.isEmpty())
+                        break;
 
-                for (PendingRow row : rows) {
-                    if (row.key.partition() == -1)
-                        row.key.partition(cctx.affinity().partition(row.key));
+                    for (PendingRow row : rows) {
+                        if (row.key.partition() == -1)
+                            
row.key.partition(cctx.affinity().partition(row.key));
 
-                    assert row.key != null && row.link != 0 && row.expireTime 
!= 0 : row;
+                        assert row.key != null && row.link != 0 && 
row.expireTime != 0 : row;
 
-                    if (obsoleteVer == null)
-                        obsoleteVer = cctx.cache().nextVersion();
+                        if (obsoleteVer == null)
+                            obsoleteVer = cctx.cache().nextVersion();
 
-                    GridCacheEntryEx entry = cctx.cache().entryEx(row.key 
instanceof KeyCacheObjectImpl
-                        ? new 
ExpiredKeyCacheObject((KeyCacheObjectImpl)row.key, row.expireTime, row.link) : 
row.key);
+                        GridCacheEntryEx entry = cctx.cache().entryEx(row.key 
instanceof KeyCacheObjectImpl
+                            ? new 
ExpiredKeyCacheObject((KeyCacheObjectImpl)row.key, row.expireTime, row.link) : 
row.key);
+
+                        if (entry != null)
+                            c.apply(entry, obsoleteVer);
+                    }
 
-                    if (entry != null)
-                        c.apply(entry, obsoleteVer);
+                    cleared += rows.size();
                 }
+                while (amount < 0 || cleared < amount);
 
-                return rows.size();
+                return cleared;
             }
             finally {
                 busyLock.leaveBusy();
@@ -1977,7 +1981,7 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
      * This entry key is used to indicate that an expired entry has already 
been deleted from
      * PendingEntriesTree and doesn't need to participate in 
PendingEntriesTree cleanup again.
      */
-    private static class ExpiredKeyCacheObject extends KeyCacheObjectImpl {
+    protected static class ExpiredKeyCacheObject extends KeyCacheObjectImpl {
         /** Serial version uid. */
         private static final long serialVersionUID = 0L;
 
@@ -1988,7 +1992,7 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
         private long link;
 
         /** */
-        private ExpiredKeyCacheObject(KeyCacheObjectImpl keyCacheObj, long 
expireTime, long link) {
+        public ExpiredKeyCacheObject(KeyCacheObjectImpl keyCacheObj, long 
expireTime, long link) {
             super(keyCacheObj.val, keyCacheObj.valBytes, 
keyCacheObj.partition());
 
             this.expireTime = expireTime;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 12fd42aed61..9de2efe0e31 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -30,6 +30,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Stream;
@@ -67,6 +68,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.GridCacheTtlManager;
 import 
org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
 import org.apache.ignite.internal.processors.cache.PartitionUpdateCounter;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteHistoricalIterator;
@@ -1107,7 +1109,28 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
         try {
             int cleared = 0;
 
+            // Use random shift to reduce contention.
+            int shift = 
ThreadLocalRandom.current().nextInt(F.size(cacheDataStores().iterator()));
+
+            int cnt = 0;
+            for (CacheDataStore store : cacheDataStores()) {
+                if (cnt++ < shift) // On the first iteration skip entries 
before <shift>.
+                    continue;
+
+                cleared += ((GridCacheDataStore)store).purgeExpired(cctx, c, 
unwindThrottlingTimeout, amount - cleared);
+
+                if (amount != -1 && cleared >= amount)
+                    return true;
+            }
+
+            if (shift == 0)
+                return false;
+
+            cnt = 0;
             for (CacheDataStore store : cacheDataStores()) {
+                if (cnt++ >= shift) // On the second iteration skip entries 
after <shift>.
+                    break;
+
                 cleared += ((GridCacheDataStore)store).purgeExpired(cctx, c, 
unwindThrottlingTimeout, amount - cleared);
 
                 if (amount != -1 && cleared >= amount)
@@ -2625,17 +2648,7 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
             if (pendingTree != null) {
                 PendingRow row = new PendingRow(cacheId);
 
-                GridCursor<PendingRow> cursor = pendingTree.find(row, row, 
PendingEntriesTree.WITHOUT_KEY);
-
-                while (cursor.next()) {
-                    PendingRow row0 = cursor.get();
-
-                    assert row0.link != 0 : row;
-
-                    boolean res = pendingTree.removex(row0);
-
-                    assert res;
-                }
+                while (pendingTree.removex(row, row, -1));
             }
 
             delegate0.clear(cacheId);
@@ -2723,43 +2736,37 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
 
                     long now = U.currentTimeMillis();
 
-                    GridCursor<PendingRow> cur;
-
-                    if (grp.sharedGroup())
-                        cur = pendingTree.find(new PendingRow(cctx.cacheId()), 
new PendingRow(cctx.cacheId(), now, 0));
-                    else
-                        cur = pendingTree.find(null, new 
PendingRow(CU.UNDEFINED_CACHE_ID, now, 0));
-
-                    if (!cur.next())
-                        return 0;
-
-                    GridCacheVersion obsoleteVer = null;
+                    int cacheId = grp.sharedGroup() ? cctx.cacheId() : 
CU.UNDEFINED_CACHE_ID;
 
                     int cleared = 0;
 
                     do {
-                        PendingRow row = cur.get();
+                        List<PendingRow> rows = pendingTree.remove(new 
PendingRow(cacheId, Long.MIN_VALUE, 0),
+                            new PendingRow(cacheId, now, 0), amount - cleared);
+
+                        if (rows.isEmpty())
+                            break;
 
-                        if (amount != -1 && cleared > amount)
-                            return cleared;
+                        for (PendingRow row : rows) {
+                            row.key.partition(partId);
 
-                        assert row.key != null && row.link != 0 && 
row.expireTime != 0 : row;
+                            assert row.key != null && row.link != 0 && 
row.expireTime != 0 : row;
 
-                        row.key.partition(partId);
+                            GridCacheVersion obsoleteVer = null;
 
-                        if (pendingTree.removex(row)) {
                             if (obsoleteVer == null)
                                 obsoleteVer = cctx.cache().nextVersion();
 
-                            GridCacheEntryEx e1 = 
cctx.cache().entryEx(row.key);
+                            GridCacheEntryEx entry = 
cctx.cache().entryEx(row.key instanceof KeyCacheObjectImpl
+                                ? new 
ExpiredKeyCacheObject((KeyCacheObjectImpl)row.key, row.expireTime, row.link) : 
row.key);
 
-                            if (e1 != null)
-                                c.apply(e1, obsoleteVer);
+                            if (entry != null)
+                                c.apply(entry, obsoleteVer);
                         }
 
-                        cleared++;
+                        cleared += rows.size();
                     }
-                    while (cur.next());
+                    while (amount < 0 || cleared < amount);
 
                     return cleared;
                 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
index 055138e9172..9816a18aeba 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/BPlusTree.java
@@ -724,7 +724,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
 
             // Search row should point to the rightmost element, otherwise we 
won't find it on the inner node.
             if (res == FOUND && r.needReplaceInner == TRUE)
-                r.row = getRow(io, leafAddr, highIdx);
+                r.row = getRow(io, leafAddr, highIdx, r.x);
 
             return res;
         }
@@ -2160,7 +2160,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
         assert canGetRowFromInner : "Not supported";
         assert limit >= 0 : limit;
 
-        RemoveRange rmvOp = new RemoveRange(lower, upper, true, limit);
+        RemoveRange rmvOp = new RemoveRange(lower, upper, true, null, limit);
 
         doRemove(rmvOp);
 
@@ -2169,6 +2169,20 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
         return Collections.unmodifiableList(rmvOp.removedRows);
     }
 
+    /**
+     * @param lower Lower bound (inclusive).
+     * @param upper Upper bound (inclusive).
+     * @param x Implementation specific argument.
+     * @param limit Limit of processed entries by single call, {@code 0} or 
negative value for no limit.
+     * @return {@code True} if removed at least one row.
+     * @throws IgniteCheckedException If failed.
+     */
+    protected boolean removex(L lower, L upper, Object x, int limit) throws 
IgniteCheckedException {
+        Boolean res = (Boolean)doRemove(new RemoveRange(lower, upper, false, 
x, limit));
+
+        return res != null ? res : false;
+    }
+
     /** {@inheritDoc} */
     @Override public void invoke(L row, Object z, InvokeClosure<T> c) throws 
IgniteCheckedException {
         checkDestroyed();
@@ -4716,6 +4730,9 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
         /** */
         final boolean needOld;
 
+        /** */
+        final Object x;
+
         /** */
         final PageHandler<Remove, Result> rmvFromLeafHnd;
 
@@ -4724,18 +4741,20 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
          * @param needOld {@code True} If need return old value.
          */
         private Remove(L row, boolean needOld) {
-            this(row, needOld, rmvFromLeaf);
+            this(row, needOld, null, rmvFromLeaf);
         }
 
         /**
          * @param row Row.
          * @param needOld {@code True} If need return old value.
+         * @param x Implementation specific argument.
          * @param rmvFromLeaf Remove from leaf page handler.
          */
-        private Remove(L row, boolean needOld, PageHandler<Remove, Result> 
rmvFromLeaf) {
+        private Remove(L row, boolean needOld, Object x, PageHandler<Remove, 
Result> rmvFromLeaf) {
             super(row);
 
             this.needOld = needOld;
+            this.x = x;
 
             rmvFromLeafHnd = rmvFromLeaf;
         }
@@ -5193,7 +5212,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
             assert !isRemoved() : "already removed";
 
             // Detach the row.
-            rmvd = needOld ? getRow(io, pageAddr, idx) : (T)Boolean.TRUE;
+            rmvd = needOld ? getRow(io, pageAddr, idx, x) : (T)Boolean.TRUE;
 
             doRemove(pageId, page, pageAddr, walPlc, io, cnt, idx);
 
@@ -6602,9 +6621,11 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
          * @param lower Lower bound (inclusive).
          * @param upper Upper bound (inclusive).
          * @param needOld {@code True} If need return old value.
+         * @param x Implementation specific argument, {@code null} always 
means that we need a full detached data row.
+         * @param limit Limit of processed entries by single call, {@code 0} 
or negative value for no limit.
          */
-        protected RemoveRange(L lower, L upper, boolean needOld, int limit) {
-            super(lower, needOld, rmvRangeFromLeaf);
+        protected RemoveRange(L lower, L upper, boolean needOld, Object x, int 
limit) {
+            super(lower, needOld, x, rmvRangeFromLeaf);
 
             this.lower = lower;
             this.upper = upper;
@@ -6667,7 +6688,7 @@ public abstract class BPlusTree<L, T extends L> extends 
DataStructure implements
             // Delete from right to left to reduce the number of items moved 
during the delete operation.
             for (int i = highIdx; i >= idx; i--) {
                 if (needOld)
-                    removedRows.add(getRow(io, pageAddr, i));
+                    removedRows.add(getRow(io, pageAddr, i, x));
 
                 doRemove(pageId, page, pageAddr, walPlc, io, cnt - highIdx + 
i, i);
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
index 3cc390e1477..7a2bd504d8b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TransactionProxyImpl.java
@@ -26,10 +26,12 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteClientDisconnectedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.processors.tracing.MTC;
 import org.apache.ignite.internal.processors.tracing.Span;
+import org.apache.ignite.internal.util.GridIntList;
 import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
 import org.apache.ignite.internal.util.future.IgniteFutureImpl;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
@@ -144,7 +146,16 @@ public class TransactionProxyImpl<K, V> implements 
TransactionProxy, Externaliza
      */
     private void leave() {
         try {
-            CU.unwindEvicts(cctx);
+            GridIntList cacheIds = tx.txState().cacheIds();
+
+            for (int i = 0; i < cacheIds.size(); i++) {
+                int cacheId = cacheIds.get(i);
+
+                GridCacheContext<K, V> ctx = cctx.cacheContext(cacheId);
+
+                if (ctx != null)
+                    CU.unwindEvicts(ctx);
+            }
 
             tx.leaveSystemSection();
         }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/PendingEntriesTree.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/PendingEntriesTree.java
index 282237e050a..a54c357e51c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/PendingEntriesTree.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/PendingEntriesTree.java
@@ -120,6 +120,18 @@ public class PendingEntriesTree extends 
BPlusTree<PendingRow, PendingRow> {
         return Long.compare(link, row.link);
     }
 
+    /**
+     * @param lower Lower bound (inclusive).
+     * @param upper Upper bound (inclusive).
+     * @param limit Limit of processed entries by single call, {@code 0} or 
negative value for no limit.
+     * @return {@code True} if removed at least one row.
+     * @throws IgniteCheckedException If failed.
+     */
+    public boolean removex(PendingRow lower, PendingRow upper, int limit) 
throws IgniteCheckedException {
+        return removex(lower, upper, WITHOUT_KEY, limit);
+    }
+
+
     /** {@inheritDoc} */
     @Override public PendingRow getRow(BPlusIO<PendingRow> io, long pageAddr, 
int idx, Object flag)
         throws IgniteCheckedException {
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerSelfTest.java
index 151bb9f130d..a3c9acdfa86 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManagerSelfTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache;
 
+import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
@@ -28,7 +29,10 @@ import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cluster.ClusterState;
 import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteKernal;
@@ -38,10 +42,13 @@ import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler;
 import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree;
 import org.apache.ignite.internal.util.typedef.CAX;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
@@ -50,14 +57,28 @@ import static org.apache.ignite.cache.CacheMode.REPLICATED;
 /**
  * TTL manager self test.
  */
+@RunWith(Parameterized.class)
 public class GridCacheTtlManagerSelfTest extends GridCommonAbstractTest {
     /** Test cache mode. */
     protected CacheMode cacheMode;
 
+    /** */
+    @Parameterized.Parameter
+    public boolean pds;
+
+    /** */
+    @Parameterized.Parameters(name = "pds={0}")
+    public static Collection<?> parameters() {
+        return F.asList(false, true);
+    }
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
+        cfg.setDataStorageConfiguration(new 
DataStorageConfiguration().setDefaultDataRegionConfiguration(
+            new DataRegionConfiguration().setPersistenceEnabled(pds)));
+
         CacheConfiguration ccfg = new CacheConfiguration(DEFAULT_CACHE_NAME);
 
         ccfg.setCacheMode(cacheMode);
@@ -93,6 +114,8 @@ public class GridCacheTtlManagerSelfTest extends 
GridCommonAbstractTest {
 
         final IgniteKernal g = (IgniteKernal)startGrid(0);
 
+        g.cluster().state(ClusterState.ACTIVE);
+
         try {
             final String key = "key";
 
@@ -116,6 +139,7 @@ public class GridCacheTtlManagerSelfTest extends 
GridCommonAbstractTest {
         }
         finally {
             stopAllGrids();
+            cleanPersistenceDir();
         }
     }
 
@@ -186,6 +210,8 @@ public class GridCacheTtlManagerSelfTest extends 
GridCommonAbstractTest {
 
             final int records = 1500;
 
+            g.cluster().state(ClusterState.ACTIVE);
+
             IgniteCache<Object, Object> cache = 
g.cache(DEFAULT_CACHE_NAME).withExpiryPolicy(
                 new CreatedExpiryPolicy(new Duration(MILLISECONDS, 1000)));
 
@@ -213,6 +239,7 @@ public class GridCacheTtlManagerSelfTest extends 
GridCommonAbstractTest {
         }
         finally {
             BPlusTree.testHndWrapper = null;
+            cleanPersistenceDir();
         }
     }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
index 4900dad3892..3e0cdb5ee33 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyTestSuite.java
@@ -57,7 +57,8 @@ import org.junit.runners.Suite;
 
     IgniteCacheExpireWhileRebalanceTest.class,
 
-    ExpiryPolicyInfoLoggingTest.class
+    ExpiryPolicyInfoLoggingTest.class,
+    PendingTreeCleaningTest.class,
 })
 public class IgniteCacheExpiryPolicyTestSuite {
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/PendingTreeCleaningTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/PendingTreeCleaningTest.java
new file mode 100644
index 00000000000..37cccc54cab
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/PendingTreeCleaningTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.expiry;
+
+import java.util.Collection;
+import javax.cache.configuration.FactoryBuilder;
+import javax.cache.expiry.CreatedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.CacheGroupContext;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+/**
+ *
+ */
+@RunWith(Parameterized.class)
+public class PendingTreeCleaningTest extends GridCommonAbstractTest {
+    /** */
+    @Parameterized.Parameter
+    public boolean pds;
+
+    /** */
+    @Parameterized.Parameters(name = "pds={0}")
+    public static Collection<?> parameters() {
+        return F.asList(false, true);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        DataStorageConfiguration dsCfg = new 
DataStorageConfiguration().setDefaultDataRegionConfiguration(
+                new DataRegionConfiguration().setPersistenceEnabled(pds));
+
+        cfg.setDataStorageConfiguration(dsCfg);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testPendingTreeCleaningOnCacheDestroy() throws Exception {
+        IgniteEx ignite = startGrid();
+
+        ignite.cluster().state(ClusterState.ACTIVE);
+
+        String cache1 = "cache1";
+        String cache2 = "cache2";
+        String grp = "grp";
+
+        ignite.getOrCreateCache(new 
CacheConfiguration<>(cache1).setGroupName(grp)
+            .setExpiryPolicyFactory(FactoryBuilder.factoryOf(new 
CreatedExpiryPolicy(Duration.ONE_DAY))));
+
+        ignite.getOrCreateCache(new 
CacheConfiguration<>(cache2).setGroupName(grp)
+            .setExpiryPolicyFactory(FactoryBuilder.factoryOf(new 
CreatedExpiryPolicy(Duration.ONE_DAY))));
+
+        int cnt = 10_000;
+
+        try (IgniteDataStreamer<Object, Object> ds = 
ignite.dataStreamer(cache1)) {
+            for (int i = 0; i < cnt; i++)
+                ds.addData(i, i);
+        }
+
+        try (IgniteDataStreamer<Object, Object> ds = 
ignite.dataStreamer(cache2)) {
+            for (int i = 0; i < cnt; i++)
+                ds.addData(i, i);
+        }
+
+        CacheGroupContext gctx = 
ignite.context().cache().cache(cache1).context().group();
+
+        assertEquals(cnt * 2, gctx.offheap().expiredSize());
+
+        ignite.destroyCache(cache2);
+
+        assertEquals(cnt, gctx.offheap().expiredSize());
+    }
+}

Reply via email to