IGNITE-5874 Store TTL expire times in B+ tree on per-partition basis - Fixes 
#3231.

Signed-off-by: Ivan Rakov <ira...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/89c77573
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/89c77573
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/89c77573

Branch: refs/heads/master
Commit: 89c775737936645eaf739b494cc1740cd9605095
Parents: 01f6054
Author: Andrey V. Mashenkov <andrey.mashen...@gmail.com>
Authored: Fri May 11 18:45:38 2018 +0300
Committer: Ivan Rakov <ira...@apache.org>
Committed: Fri May 11 18:45:38 2018 +0300

----------------------------------------------------------------------
 .../JettyRestProcessorAbstractSelfTest.java     |  21 -
 .../PdsWithTtlCompatibilityTest.java            | 191 +++++++++
 .../IgniteCompatibilityBasicTestSuite.java      |   3 +
 .../apache/ignite/IgniteSystemProperties.java   |  11 +
 .../delta/MetaPageUpdateLastAllocatedIndex.java |   6 +-
 .../processors/cache/CacheGroupContext.java     |  16 +-
 .../processors/cache/GridCacheMapEntry.java     |  41 +-
 .../cache/IgniteCacheOffheapManager.java        |  21 +-
 .../cache/IgniteCacheOffheapManagerImpl.java    | 128 +++---
 .../distributed/dht/GridDhtLocalPartition.java  | 156 +++----
 .../dht/GridDhtPartitionsStateValidator.java    |   1 +
 .../GridDhtPartitionsExchangeFuture.java        |  14 +-
 .../persistence/GridCacheOffheapManager.java    | 415 ++++++++++++++-----
 .../UpgradePendingTreeToPerPartitionTask.java   | 380 +++++++++++++++++
 .../cache/persistence/tree/io/PageIO.java       |  11 +-
 .../tree/io/PagePartitionMetaIO.java            |  35 +-
 .../tree/io/PagePartitionMetaIOV2.java          |  90 ++++
 ...idCachePartitionsStateValidatorSelfTest.java |  10 +-
 .../IgnitePdsContinuousRestartTest.java         |  89 +++-
 .../IgnitePdsContinuousRestartTest2.java        | 281 -------------
 ...dsContinuousRestartTestWithExpiryPolicy.java |  67 +++
 .../IgniteBaselineAbstractFullApiSelfTest.java  |   9 +-
 .../persistence/db/IgnitePdsWithTtlTest.java    | 197 +++++++++
 .../ignite/testsuites/IgnitePdsTestSuite.java   |   2 +
 .../ignite/testsuites/IgnitePdsTestSuite2.java  |   5 +-
 25 files changed, 1582 insertions(+), 618 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
 
b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
index 0285f3a..96391e9 100644
--- 
a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
+++ 
b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java
@@ -978,27 +978,10 @@ public abstract class JettyRestProcessorAbstractSelfTest 
extends AbstractRestPro
         assertCacheOperation(ret, true);
     }
 
-    /** */
-    private void failIgnite_5874() {
-        DataStorageConfiguration dsCfg = 
ignite(0).configuration().getDataStorageConfiguration();
-
-        if (dsCfg.getDefaultDataRegionConfiguration().isPersistenceEnabled())
-            fail("IGNITE-5874");
-
-        if (!F.isEmpty(dsCfg.getDataRegionConfigurations())) {
-            for (DataRegionConfiguration dataRegCfg : 
dsCfg.getDataRegionConfigurations()) {
-                if (dataRegCfg.isPersistenceEnabled())
-                    fail("IGNITE-5874");
-            }
-        }
-    }
-
     /**
      * @throws Exception If failed.
      */
     public void testPutWithExpiration() throws Exception {
-        failIgnite_5874();
-
         String ret = content(DEFAULT_CACHE_NAME, GridRestCommand.CACHE_PUT,
             "key", "putKey",
             "val", "putVal",
@@ -1035,8 +1018,6 @@ public abstract class JettyRestProcessorAbstractSelfTest 
extends AbstractRestPro
      * @throws Exception If failed.
      */
     public void testAddWithExpiration() throws Exception {
-        failIgnite_5874();
-
         String ret = content(DEFAULT_CACHE_NAME, GridRestCommand.CACHE_ADD,
             "key", "addKey",
             "val", "addVal",
@@ -1176,8 +1157,6 @@ public abstract class JettyRestProcessorAbstractSelfTest 
extends AbstractRestPro
      * @throws Exception If failed.
      */
     public void testReplaceWithExpiration() throws Exception {
-        failIgnite_5874();
-
         jcache().put("replaceKey", "replaceVal");
 
         assertEquals("replaceVal", jcache().get("replaceKey"));

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/compatibility/src/test/java/org/apache/ignite/compatibility/PdsWithTtlCompatibilityTest.java
----------------------------------------------------------------------
diff --git 
a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/PdsWithTtlCompatibilityTest.java
 
b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/PdsWithTtlCompatibilityTest.java
new file mode 100644
index 0000000..f3649f6
--- /dev/null
+++ 
b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/PdsWithTtlCompatibilityTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.compatibility;
+
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+import javax.cache.Cache;
+import javax.cache.expiry.AccessedExpiryPolicy;
+import javax.cache.expiry.Duration;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import 
org.apache.ignite.compatibility.persistence.IgnitePersistenceCompatibilityAbstractTest;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.PersistentStoreConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import 
org.apache.ignite.internal.processors.cache.GridCacheAbstractFullApiSelfTest;
+import 
org.apache.ignite.internal.processors.cache.persistence.migration.UpgradePendingTreeToPerPartitionTask;
+import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.testframework.GridTestUtils;
+
+/**
+ * Test PendingTree upgrading to per-partition basis. Test fill cache with 
persistence enabled and with ExpirePolicy
+ * configured on ignite-2.1 version and check if entries will be correctly 
expired when a new version node started.
+ *
+ * Note: Test for ignite-2.3 version will always fails due to entry ttl update 
fails with assertion on checkpoint lock
+ * check.
+ */
+public class PdsWithTtlCompatibilityTest extends 
IgnitePersistenceCompatibilityAbstractTest {
+    /** */
+    static final String TEST_CACHE_NAME = 
PdsWithTtlCompatibilityTest.class.getSimpleName();
+
+    /** */
+    static final int DURATION_SEC = 10;
+
+    /** */
+    private static final int ENTRIES_CNT = 100;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setPeerClassLoadingEnabled(false);
+
+        cfg.setDataStorageConfiguration(
+            new DataStorageConfiguration()
+                .setDefaultDataRegionConfiguration(
+                    new DataRegionConfiguration()
+                        .setMaxSize(32L * 1024 * 1024)
+                        .setPersistenceEnabled(true)
+                ).setWalMode(WALMode.LOG_ONLY));
+
+        return cfg;
+    }
+
+    /**
+     * Tests opportunity to read data from previous Ignite DB version.
+     *
+     * @throws Exception If failed.
+     */
+    public void testNodeStartByOldVersionPersistenceData_2_1() throws 
Exception {
+        doTestStartupWithOldVersion("2.1.0");
+    }
+
+    /**
+     * Tests opportunity to read data from previous Ignite DB version.
+     *
+     * @param igniteVer 3-digits version of ignite
+     * @throws Exception If failed.
+     */
+    protected void doTestStartupWithOldVersion(String igniteVer) throws 
Exception {
+        try {
+            startGrid(1, igniteVer, new ConfigurationClosure(), new 
PostStartupClosure());
+
+            stopAllGrids();
+
+            IgniteEx ignite = startGrid(0);
+
+            assertEquals(1, ignite.context().discovery().topologyVersion());
+
+            ignite.active(true);
+
+            validateResultingCacheData(ignite, ignite.cache(TEST_CACHE_NAME));
+        }
+        finally {
+            stopAllGrids();
+        }
+    }
+
+    /**
+     * @param cache to be filled by different keys and values. Results may be 
validated in {@link
+     * #validateResultingCacheData(Ignite, IgniteCache)}.
+     */
+    public static void saveCacheData(Cache<Object, Object> cache) {
+        for (int i = 0; i < ENTRIES_CNT; i++)
+            cache.put(i, "data-" + i);
+
+        //Touch
+        for (int i = 0; i < ENTRIES_CNT; i++)
+            assertNotNull(cache.get(i));
+    }
+
+    /**
+     * Asserts cache contained all expected values as it was saved before.
+     *
+     * @param cache cache should be filled using {@link #saveCacheData(Cache)}.
+     */
+    public static void validateResultingCacheData(Ignite ignite,
+        IgniteCache<Object, Object> cache) throws 
IgniteInterruptedCheckedException {
+
+        final long expireTime = System.currentTimeMillis() + 
TimeUnit.SECONDS.toMillis(DURATION_SEC + 1);
+
+        final IgniteFuture<Collection<Boolean>> future = 
ignite.compute().broadcastAsync(new UpgradePendingTreeToPerPartitionTask());
+
+        GridTestUtils.waitForCondition(new PA() {
+            @Override public boolean apply() {
+                return future.isDone() && expireTime < 
System.currentTimeMillis();
+            }
+        }, TimeUnit.SECONDS.toMillis(DURATION_SEC + 2));
+
+        for (Boolean res : future.get())
+            assertTrue(res);
+
+        for (int i = 0; i < ENTRIES_CNT; i++)
+            assertNull(cache.get(i));
+    }
+
+    /** */
+    public static class ConfigurationClosure implements 
IgniteInClosure<IgniteConfiguration> {
+        /** {@inheritDoc} */
+        @Override public void apply(IgniteConfiguration cfg) {
+            cfg.setLocalHost("127.0.0.1");
+
+            TcpDiscoverySpi disco = new TcpDiscoverySpi();
+            
disco.setIpFinder(GridCacheAbstractFullApiSelfTest.LOCAL_IP_FINDER);
+
+            cfg.setDiscoverySpi(disco);
+
+            cfg.setPeerClassLoadingEnabled(false);
+
+            cfg.setPersistentStoreConfiguration(new 
PersistentStoreConfiguration().setWalMode(WALMode.LOG_ONLY));
+        }
+    }
+
+    /** */
+    public static class PostStartupClosure implements IgniteInClosure<Ignite> {
+        /** {@inheritDoc} */
+        @Override public void apply(Ignite ignite) {
+            ignite.active(true);
+
+            CacheConfiguration<Object, Object> cacheCfg = new 
CacheConfiguration<>();
+            cacheCfg.setName(TEST_CACHE_NAME);
+            cacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+            cacheCfg.setBackups(1);
+            
cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
+            cacheCfg.setExpiryPolicyFactory(AccessedExpiryPolicy.factoryOf(new 
Duration(TimeUnit.SECONDS, DURATION_SEC)));
+            cacheCfg.setEagerTtl(true);
+            cacheCfg.setGroupName("myGroup");
+
+            IgniteCache<Object, Object> cache = ignite.createCache(cacheCfg);
+
+            saveCacheData(cache);
+
+            ignite.active(false);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testsuites/IgniteCompatibilityBasicTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testsuites/IgniteCompatibilityBasicTestSuite.java
 
b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testsuites/IgniteCompatibilityBasicTestSuite.java
index b526137..f6dd736 100644
--- 
a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testsuites/IgniteCompatibilityBasicTestSuite.java
+++ 
b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testsuites/IgniteCompatibilityBasicTestSuite.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.compatibility.testsuites;
 
 import junit.framework.TestSuite;
+import org.apache.ignite.compatibility.PdsWithTtlCompatibilityTest;
 import 
org.apache.ignite.compatibility.persistence.FoldersReuseCompatibilityTest;
 import 
org.apache.ignite.compatibility.persistence.MigratingToWalV2SerializerWithCompactionTest;
 import 
org.apache.ignite.compatibility.persistence.PersistenceBasicCompatibilityTest;
@@ -35,6 +36,8 @@ public class IgniteCompatibilityBasicTestSuite {
 
         suite.addTestSuite(PersistenceBasicCompatibilityTest.class);
 
+        suite.addTestSuite(PdsWithTtlCompatibilityTest.class);
+
         suite.addTestSuite(FoldersReuseCompatibilityTest.class);
 
         suite.addTestSuite(MigratingToWalV2SerializerWithCompactionTest.class);

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 008974c..727e809 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -888,6 +888,17 @@ public final class IgniteSystemProperties {
     public static final String IGNITE_DISABLE_WAL_DURING_REBALANCING = 
"IGNITE_DISABLE_WAL_DURING_REBALANCING";
 
     /**
+     * When set to {@code true}, Ignite will skip partitions sizes check on 
partition validation after rebalance has finished.
+     * Partitions sizes may differs on nodes when Expiry Policy is in use and 
it is ok due to lazy entry eviction mechanics.
+     *
+     * There is no need to disable partition size validation either in normal 
case or when expiry policy is configured for cache.
+     * But it should be disabled manually when policy is used on per entry 
basis to hint Ignite to skip this check.
+     *
+     * Default is {@code false}.
+     */
+    public static final String IGNITE_SKIP_PARTITION_SIZE_VALIDATION = 
"IGNITE_SKIP_PARTITION_SIZE_VALIDATION";
+
+    /**
      * Enforces singleton.
      */
     private IgniteSystemProperties() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdateLastAllocatedIndex.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdateLastAllocatedIndex.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdateLastAllocatedIndex.java
index 39f6a03..324227b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdateLastAllocatedIndex.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdateLastAllocatedIndex.java
@@ -41,9 +41,11 @@ public class MetaPageUpdateLastAllocatedIndex extends 
PageDeltaRecord {
 
     /** {@inheritDoc} */
     @Override public void applyDelta(PageMemory pageMem, long pageAddr) throws 
IgniteCheckedException {
-        assert PageIO.getType(pageAddr) == PageIO.T_META || 
PageIO.getType(pageAddr) == PageIO.T_PART_META;
+        int type = PageIO.getType(pageAddr);
 
-        PageMetaIO io = 
PageMetaIO.VERSIONS.forVersion(PageIO.getVersion(pageAddr));
+        assert type == PageIO.T_META || type == PageIO.T_PART_META;
+
+        PageMetaIO io = PageIO.getPageIO(type, PageIO.getVersion(pageAddr));
 
         io.setLastAllocatedPageCount(pageAddr, lastAllocatedIdx);
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index 5f750d5..d1bdbb6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -39,9 +39,9 @@ import 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import 
org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsEvictor;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopologyImpl;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsEvictor;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
 import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
 import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager;
@@ -313,7 +313,7 @@ public class CacheGroupContext {
             drEnabled = true;
 
         this.caches = caches;
-   }
+    }
 
     /**
      * @param cctx Cache context.
@@ -372,8 +372,8 @@ public class CacheGroupContext {
         List<GridCacheContext> caches = this.caches;
 
         assert !sharedGroup() && caches.size() == 1 :
-            "stopping=" +  ctx.kernalContext().isStopping() + ", groupName=" + 
ccfg.getGroupName() +
-            ", caches=" + caches;
+            "stopping=" + ctx.kernalContext().isStopping() + ", groupName=" + 
ccfg.getGroupName() +
+                ", caches=" + caches;
 
         return caches.get(0);
     }
@@ -434,6 +434,7 @@ public class CacheGroupContext {
             }
         }
     }
+
     /**
      * Adds partition unload event.
      *
@@ -514,13 +515,6 @@ public class CacheGroupContext {
     }
 
     /**
-     * @return {@code True} if fast eviction is allowed.
-     */
-    public boolean allowFastEviction() {
-        return persistenceEnabled() && !queriesEnabled();
-    }
-
-    /**
      * @return {@code True} in case replication is enabled.
      */
     public boolean isDrEnabled() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index 9f3686a..767c314 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -34,6 +34,7 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.eviction.EvictableEntry;
+import org.apache.ignite.internal.NodeStoppingException;
 import org.apache.ignite.internal.pagemem.wal.StorageException;
 import org.apache.ignite.internal.pagemem.wal.WALPointer;
 import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
@@ -401,7 +402,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
             checkObsolete();
 
             if (isStartVersion() && ((flags & IS_UNSWAPPED_MASK) == 0)) {
-                assert row == null || row.key() == key: "Unexpected row key";
+                assert row == null || row.key() == key : "Unexpected row key";
 
                 CacheDataRow read = row == null ? cctx.offheap().read(this) : 
row;
 
@@ -1411,7 +1412,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
 
             if (readThrough && needVal && old == null &&
                 (cctx.readThrough() && (op == GridCacheOperation.TRANSFORM || 
cctx.loadPreviousValue()))) {
-                    old0 = readThrough(null, key, false, subjId, taskName);
+                old0 = readThrough(null, key, false, subjId, taskName);
 
                 old = cctx.toCacheObject(old0);
 
@@ -2462,7 +2463,14 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
 
         ttlAndExpireTimeExtras(ttl, expireTime);
 
-        storeValue(val, expireTime, ver, null);
+        cctx.shared().database().checkpointReadLock();
+
+        try {
+            storeValue(val, expireTime, ver, null);
+        }
+        finally {
+            cctx.shared().database().checkpointReadUnlock();
+        }
     }
 
     /**
@@ -3108,7 +3116,8 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
             GridCacheMvcc mvcc = mvccExtras();
 
             return mvcc != null && mvcc.isLocallyOwnedByIdOrThread(lockVer, 
threadId);
-        } finally {
+        }
+        finally {
             unlockEntry();
         }
     }
@@ -3347,6 +3356,10 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
                     obsolete = true;
             }
         }
+        catch (NodeStoppingException ignore) {
+            if (log.isDebugEnabled())
+                log.warning("Node is stopping while removing expired value.", 
ignore);
+        }
         catch (IgniteCheckedException e) {
             U.error(log, "Failed to clean up expired cache entry: " + this, e);
         }
@@ -3406,7 +3419,14 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
         if (log.isTraceEnabled())
             log.trace("onExpired clear [key=" + key + ", entry=" + 
System.identityHashCode(this) + ']');
 
-        removeValue();
+        cctx.shared().database().checkpointReadLock();
+
+        try {
+            removeValue();
+        }
+        finally {
+            cctx.shared().database().checkpointReadUnlock();
+        }
 
         if (cctx.events().isRecordable(EVT_CACHE_OBJECT_EXPIRED)) {
             cctx.events().addEvent(partition(),
@@ -3586,8 +3606,9 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
      * @param ver New entry version.
      * @param oldRow Old row if available.
      * @param predicate Optional predicate.
-     * @throws IgniteCheckedException If update failed.
+     *
      * @return {@code True} if storage was modified.
+     * @throws IgniteCheckedException If update failed.
      */
     protected boolean storeValue(
         @Nullable CacheObject val,
@@ -3599,7 +3620,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
 
         UpdateClosure closure = new UpdateClosure(this, val, ver, expireTime, 
predicate);
 
-        cctx.offheap().invoke(cctx, key,  localPartition(), closure);
+        cctx.offheap().invoke(cctx, key, localPartition(), closure);
 
         return closure.treeOp != IgniteTree.OperationType.NOOP;
     }
@@ -4051,7 +4072,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
     }
 
     /**
-     *  Increments public size of map.
+     * Increments public size of map.
      */
     protected void incrementMapPublicSize() {
         GridDhtLocalPartition locPart = localPartition();
@@ -4782,7 +4803,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
 
                 needUpdate = true;
             }
-            else if (updateExpireTime && expiryPlc != null && entry.val != 
null){
+            else if (updateExpireTime && expiryPlc != null && entry.val != 
null) {
                 long ttl = expiryPlc.forAccess();
 
                 if (ttl != CU.TTL_NOT_CHANGED) {
@@ -4929,7 +4950,7 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
             if (entry.val == null) {
                 boolean new0 = entry.isStartVersion();
 
-                assert entry.deletedUnlocked() || new0 || entry.isInternal(): 
"Invalid entry [entry=" + entry +
+                assert entry.deletedUnlocked() || new0 || entry.isInternal() : 
"Invalid entry [entry=" + entry +
                     ", locNodeId=" + cctx.localNodeId() + ']';
 
                 if (!new0 && !entry.isInternal())

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index a12c033..fa25412 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -21,12 +21,13 @@ import java.util.Map;
 import javax.cache.Cache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.persistence.RootPage;
 import org.apache.ignite.internal.processors.cache.persistence.RowStore;
 import 
org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
+import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner;
 import org.apache.ignite.internal.util.GridAtomicLong;
@@ -47,7 +48,7 @@ public interface IgniteCacheOffheapManager {
      * @param grp Cache group.
      * @throws IgniteCheckedException If failed.
      */
-    public void start(GridCacheSharedContext ctx, CacheGroupContext grp) 
throws IgniteCheckedException;;
+    public void start(GridCacheSharedContext ctx, CacheGroupContext grp) 
throws IgniteCheckedException;
 
     /**
      * @param cctx Cache context.
@@ -142,6 +143,8 @@ public interface IgniteCacheOffheapManager {
     /**
      * @param cctx Cache context.
      * @param c Closure.
+     * @param amount Limit of processed entries by single call, {@code -1} for 
no limit.
+     * @return {@code True} if unprocessed expired entries remains.
      * @throws IgniteCheckedException If failed.
      */
     public boolean expire(GridCacheContext cctx, 
IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c, int amount)
@@ -167,9 +170,9 @@ public interface IgniteCacheOffheapManager {
 
     /**
      * @param cctx Cache context.
-     * @param key  Key.
-     * @param val  Value.
-     * @param ver  Version.
+     * @param key Key.
+     * @param val Value.
+     * @param ver Version.
      * @param expireTime Expire time.
      * @param oldRow Old row if available.
      * @param part Partition.
@@ -537,5 +540,13 @@ public interface IgniteCacheOffheapManager {
          * @param rowCacheCleaner Rows cache cleaner.
          */
         public void setRowCacheCleaner(GridQueryRowCacheCleaner 
rowCacheCleaner);
+
+        /**
+         * Return PendingTree for data store.
+         *
+         * @return PendingTree instance.
+         * @throws IgniteCheckedException
+         */
+        PendingEntriesTree pendingTree();
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 5c78eb5..bf0de02 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -101,16 +101,16 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
     protected final ConcurrentMap<Integer, CacheDataStore> partDataStores = 
new ConcurrentHashMap<>();
 
     /** */
-    protected PendingEntriesTree pendingEntries;
+    private PendingEntriesTree pendingEntries;
 
     /** */
-    private volatile boolean hasPendingEntries;
+    protected volatile boolean hasPendingEntries;
 
     /** */
     private final GridAtomicLong globalRmvId = new 
GridAtomicLong(U.currentTimeMillis() * 1000_000);
 
     /** */
-    private final GridSpinBusyLock busyLock = new GridSpinBusyLock();
+    protected final GridSpinBusyLock busyLock = new GridSpinBusyLock();
 
     /** */
     private int updateValSizeThreshold;
@@ -148,19 +148,29 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
 
     /** {@inheritDoc} */
     public void onCacheStarted(GridCacheContext cctx) throws 
IgniteCheckedException {
+        initPendingTree(cctx);
+    }
+
+    /**
+     * @param cctx Cache context.
+     * @throws IgniteCheckedException If failed.
+     */
+    protected void initPendingTree(GridCacheContext cctx) throws 
IgniteCheckedException {
+        assert !cctx.group().persistenceEnabled();
+
         if (cctx.affinityNode() && cctx.ttl().eagerTtlEnabled() && 
pendingEntries == null) {
             String name = "PendingEntries";
 
-                long rootPage = allocateForTree();
+            long rootPage = allocateForTree();
 
-                pendingEntries = new PendingEntriesTree(
-                    grp,
-                    name,
-                    grp.dataRegion().pageMemory(),
-                    rootPage,
-                    grp.reuseList(),
-                    true);
-            }
+            pendingEntries = new PendingEntriesTree(
+                grp,
+                name,
+                grp.dataRegion().pageMemory(),
+                rootPage,
+                grp.reuseList(),
+                true);
+        }
     }
 
     /**
@@ -204,11 +214,11 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
         try {
             if (grp.sharedGroup()) {
                 assert cacheId != CU.UNDEFINED_CACHE_ID;
-                assert ctx.database().checkpointLockIsHeldByThread();
 
                 for (CacheDataStore store : cacheDataStores())
                     store.clear(cacheId);
 
+                // Clear non-persistent pending tree if needed.
                 if (pendingEntries != null) {
                     PendingRow row = new PendingRow(cacheId);
 
@@ -241,6 +251,14 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
         }
     }
 
+    /**
+     * @param part Partition.
+     * @return Data store for given entry.
+     */
+    public CacheDataStore dataStore(int part) {
+        return grp.isLocal() ? locCacheDataStore : partDataStores.get(part);
+    }
+
     /** {@inheritDoc} */
     @Override public long cacheEntriesCount(int cacheId) {
         long size = 0;
@@ -1011,51 +1029,56 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
     ) throws IgniteCheckedException {
         assert !cctx.isNear() : cctx.name();
 
-        if (hasPendingEntries && pendingEntries != null) {
-            GridCacheVersion obsoleteVer = null;
+        if (!hasPendingEntries || pendingEntries == null)
+            return false;
 
-            long now = U.currentTimeMillis();
+        GridCacheVersion obsoleteVer = null;
 
-            GridCursor<PendingRow> cur;
+        long now = U.currentTimeMillis();
 
-            if (grp.sharedGroup())
-                cur = pendingEntries.find(new PendingRow(cctx.cacheId()), new 
PendingRow(cctx.cacheId(), now, 0));
-            else
-                cur = pendingEntries.find(null, new 
PendingRow(CU.UNDEFINED_CACHE_ID, now, 0));
+        GridCursor<PendingRow> cur;
 
-            if (!cur.next())
-                return false;
+        if (grp.sharedGroup())
+            cur = pendingEntries.find(new PendingRow(cctx.cacheId()), new 
PendingRow(cctx.cacheId(), now, 0));
+        else
+            cur = pendingEntries.find(null, new 
PendingRow(CU.UNDEFINED_CACHE_ID, now, 0));
 
-            int cleared = 0;
+        if (!cur.next())
+            return false;
 
-            cctx.shared().database().checkpointReadLock();
+        int cleared = 0;
 
-            try {
-                do {
-                    PendingRow row = cur.get();
+        if (!busyLock.enterBusy())
+            return false;
 
-                    if (amount != -1 && cleared > amount)
-                        return true;
+        try {
+            do {
+                if (amount != -1 && cleared > amount)
+                    return true;
 
-                    if (row.key.partition() == -1)
-                        row.key.partition(cctx.affinity().partition(row.key));
+                PendingRow row = cur.get();
 
-                    assert row.key != null && row.link != 0 && row.expireTime 
!= 0 : row;
+                if (row.key.partition() == -1)
+                    row.key.partition(cctx.affinity().partition(row.key));
 
-                    if (pendingEntries.removex(row)) {
-                        if (obsoleteVer == null)
-                            obsoleteVer = ctx.versions().next();
+                assert row.key != null && row.link != 0 && row.expireTime != 0 
: row;
 
-                        c.apply(cctx.cache().entryEx(row.key), obsoleteVer);
-                    }
+                if (pendingEntries.removex(row)) {
+                    if (obsoleteVer == null)
+                        obsoleteVer = ctx.versions().next();
+
+                    GridCacheEntryEx entry = cctx.cache().entryEx(row.key);
 
-                    cleared++;
+                    if (entry != null)
+                        c.apply(entry, obsoleteVer);
                 }
-                while (cur.next());
-            }
-            finally {
-                cctx.shared().database().checkpointReadUnlock();
+
+                cleared++;
             }
+            while (cur.next());
+        }
+        finally {
+            busyLock.leaveBusy();
         }
 
         return false;
@@ -1395,15 +1418,15 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
             if (oldRow != null) {
                 assert oldRow.link() != 0 : oldRow;
 
-                if (pendingEntries != null && oldRow.expireTime() != 0)
-                    pendingEntries.removex(new PendingRow(cacheId, 
oldRow.expireTime(), oldRow.link()));
+                if (pendingTree() != null && oldRow.expireTime() != 0)
+                    pendingTree().removex(new PendingRow(cacheId, 
oldRow.expireTime(), oldRow.link()));
 
                 if (newRow.link() != oldRow.link())
                     rowStore.removeRow(oldRow.link());
             }
 
-            if (pendingEntries != null && expireTime != 0) {
-                pendingEntries.putx(new PendingRow(cacheId, expireTime, 
newRow.link()));
+            if (pendingTree() != null && expireTime != 0) {
+                pendingTree().putx(new PendingRow(cacheId, expireTime, 
newRow.link()));
 
                 hasPendingEntries = true;
             }
@@ -1444,8 +1467,8 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
                 assert cacheId == CU.UNDEFINED_CACHE_ID || oldRow.cacheId() == 
cacheId :
                     "Incorrect cache ID [expected=" + cacheId + ", actual=" + 
oldRow.cacheId() + "].";
 
-                if (pendingEntries != null && oldRow.expireTime() != 0)
-                    pendingEntries.removex(new PendingRow(cacheId, 
oldRow.expireTime(), oldRow.link()));
+                if (pendingTree() != null && oldRow.expireTime() != 0)
+                    pendingTree().removex(new PendingRow(cacheId, 
oldRow.expireTime(), oldRow.link()));
 
                 decrementSize(cctx.cacheId());
             }
@@ -1543,7 +1566,6 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
         /** {@inheritDoc} */
         @Override public void clear(int cacheId) throws IgniteCheckedException 
{
             assert cacheId != CU.UNDEFINED_CACHE_ID;
-            assert ctx.database().checkpointLockIsHeldByThread();
 
             if (cacheSize(cacheId) == 0)
                 return;
@@ -1624,6 +1646,11 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
             }
         }
 
+        /** {@inheritDoc} */
+        @Override public PendingEntriesTree pendingTree() {
+            return pendingEntries;
+        }
+
         /**
          * @param cctx Cache context.
          * @param key Key.
@@ -1676,5 +1703,4 @@ public class IgniteCacheOffheapManagerImpl implements 
IgniteCacheOffheapManager
                 return 0;
         }
     }
-
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index be74eff..a199f6c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -47,7 +47,6 @@ import 
org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
 import 
org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras;
 import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
-import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -57,9 +56,9 @@ import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.util.deque.FastSizeDeque;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
-import org.apache.ignite.util.deque.FastSizeDeque;
 
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_REMOVED_ENTRIES_TTL;
@@ -342,9 +341,6 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
      * @return {@code True} if partition is empty.
      */
     public boolean isEmpty() {
-        if (grp.allowFastEviction())
-            return internalSize() == 0;
-
         return store.fullSize() == 0 && internalSize() == 0;
     }
 
@@ -981,78 +977,76 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
 
         long cleared = 0;
 
-        if (!grp.allowFastEviction()) {
-            CacheMapHolder hld = grp.sharedGroup() ? null : 
singleCacheEntryMap;
+        CacheMapHolder hld = grp.sharedGroup() ? null : singleCacheEntryMap;
 
-            try {
-                GridIterator<CacheDataRow> it0 = 
grp.offheap().partitionIterator(id);
+        try {
+            GridIterator<CacheDataRow> it0 = 
grp.offheap().partitionIterator(id);
 
-                while (it0.hasNext()) {
-                    ctx.database().checkpointReadLock();
+            while (it0.hasNext()) {
+                ctx.database().checkpointReadLock();
 
-                    try {
-                        CacheDataRow row = it0.next();
-
-                        // Do not clear fresh rows in case of single partition 
clearing.
-                        if (row.version().compareTo(clearVer) >= 0 && (state() 
== MOVING && clear))
-                            continue;
-
-                        if (grp.sharedGroup() && (hld == null || 
hld.cctx.cacheId() != row.cacheId()))
-                            hld = 
cacheMapHolder(ctx.cacheContext(row.cacheId()));
-
-                        assert hld != null;
-
-                        GridCacheMapEntry cached = putEntryIfObsoleteOrAbsent(
-                            hld,
-                            hld.cctx,
-                            grp.affinity().lastVersion(),
-                            row.key(),
-                            true,
-                            false);
-
-                        if (cached instanceof GridDhtCacheEntry && 
((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) {
-                            removeEntry(cached);
-
-                            if (rec && !hld.cctx.config().isEventsDisabled()) {
-                                hld.cctx.events().addEvent(cached.partition(),
-                                    cached.key(),
-                                    ctx.localNodeId(),
-                                    (IgniteUuid)null,
-                                    null,
-                                    EVT_CACHE_REBALANCE_OBJECT_UNLOADED,
-                                    null,
-                                    false,
-                                    cached.rawGet(),
-                                    cached.hasValue(),
-                                    null,
-                                    null,
-                                    null,
-                                    false);
-                            }
-
-                            cleared++;
+                try {
+                    CacheDataRow row = it0.next();
+
+                    // Do not clear fresh rows in case of single partition 
clearing.
+                    if (row.version().compareTo(clearVer) >= 0 && (state() == 
MOVING && clear))
+                        continue;
+
+                    if (grp.sharedGroup() && (hld == null || 
hld.cctx.cacheId() != row.cacheId()))
+                        hld = cacheMapHolder(ctx.cacheContext(row.cacheId()));
+
+                    assert hld != null;
+
+                    GridCacheMapEntry cached = putEntryIfObsoleteOrAbsent(
+                        hld,
+                        hld.cctx,
+                        grp.affinity().lastVersion(),
+                        row.key(),
+                        true,
+                        false);
+
+                    if (cached instanceof GridDhtCacheEntry && 
((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) {
+                        removeEntry(cached);
+
+                        if (rec && !hld.cctx.config().isEventsDisabled()) {
+                            hld.cctx.events().addEvent(cached.partition(),
+                                cached.key(),
+                                ctx.localNodeId(),
+                                (IgniteUuid)null,
+                                null,
+                                EVT_CACHE_REBALANCE_OBJECT_UNLOADED,
+                                null,
+                                false,
+                                cached.rawGet(),
+                                cached.hasValue(),
+                                null,
+                                null,
+                                null,
+                                false);
                         }
-                    }
-                    catch (GridDhtInvalidPartitionException e) {
-                        assert isEmpty() && state() == EVICTED : "Invalid 
error [e=" + e + ", part=" + this + ']';
 
-                        break; // Partition is already concurrently cleared 
and evicted.
-                    }
-                    finally {
-                        ctx.database().checkpointReadUnlock();
+                        cleared++;
                     }
                 }
-            }
-            catch (NodeStoppingException e) {
-                if (log.isDebugEnabled())
-                    log.debug("Failed to get iterator for evicted partition: " 
+ id);
+                catch (GridDhtInvalidPartitionException e) {
+                    assert isEmpty() && state() == EVICTED : "Invalid error 
[e=" + e + ", part=" + this + ']';
 
-                throw e;
-            }
-            catch (IgniteCheckedException e) {
-                U.error(log, "Failed to get iterator for evicted partition: " 
+ id, e);
+                    break; // Partition is already concurrently cleared and 
evicted.
+                }
+                finally {
+                    ctx.database().checkpointReadUnlock();
+                }
             }
         }
+        catch (NodeStoppingException e) {
+            if (log.isDebugEnabled())
+                log.debug("Failed to get iterator for evicted partition: " + 
id);
+
+            throw e;
+        }
+        catch (IgniteCheckedException e) {
+            U.error(log, "Failed to get iterator for evicted partition: " + 
id, e);
+        }
 
         return cleared;
     }
@@ -1406,37 +1400,9 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
         }
 
         /**
-         * Recreate cache data store after successful clearing and allowed 
fast eviction.
-         */
-        private void recreateCacheDataStore() {
-            assert grp.offheap() instanceof GridCacheOffheapManager;
-
-            try {
-                CacheDataStore store0 = store;
-
-                store = ((GridCacheOffheapManager) 
grp.offheap()).recreateCacheDataStore(store0);
-
-                // Inject row cache cleaner on store creation
-                // Used in case the cache with enabled SqlOnheapCache is 
single cache at the cache group
-                if (ctx.kernalContext().query().moduleEnabled()) {
-                    GridQueryRowCacheCleaner cleaner = 
ctx.kernalContext().query().getIndexing()
-                            .rowCacheCleaner(grp.groupId());
-
-                    if (store != null && cleaner != null)
-                        store.setRowCacheCleaner(cleaner);
-                }
-            } catch (IgniteCheckedException e) {
-                finish(e);
-            }
-        }
-
-        /**
          * Successfully finishes the future.
          */
         public void finish() {
-            if (state() == MOVING && clear && grp.allowFastEviction())
-                recreateCacheDataStore();
-
             synchronized (this) {
                 onDone();
                 finished = true;

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java
index cc0542c..866c513 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java
@@ -95,6 +95,7 @@ public class GridDhtPartitionsStateValidator {
 
         // Validate cache sizes.
         result = validatePartitionsSizes(top, messages, ignoringNodes);
+
         if (!result.isEmpty())
             throw new IgniteCheckedException("Partitions cache sizes are 
inconsistent for " + fold(topVer, result));
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index 39f4ed1..1b79b76 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -136,6 +136,13 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
     /** */
     private static final IgniteProductVersion FORCE_AFF_REASSIGNMENT_SINCE = 
IgniteProductVersion.fromString("2.4.3");
 
+    /**
+     * This may be useful when per-entry (not per-cache based) partition 
policy is in use.
+     * See {@link 
IgniteSystemProperties#IGNITE_SKIP_PARTITION_SIZE_VALIDATION} for details.
+     * Default value is {@code false}.
+     */
+    private static final boolean SKIP_PARTITION_SIZE_VALIDATION = 
Boolean.getBoolean(IgniteSystemProperties.IGNITE_SKIP_PARTITION_SIZE_VALIDATION);
+
     /** */
     @GridToStringExclude
     private final Object mux = new Object();
@@ -2755,13 +2762,16 @@ public class GridDhtPartitionsExchangeFuture extends 
GridDhtTopologyFutureAdapte
                     grpCtx.topology() :
                     cctx.exchange().clientTopology(grpId, 
events().discoveryCache());
 
-            // Do not validate read or write through caches or caches with 
disabled rebalance.
+            // Do not validate read or write through caches or caches with 
disabled rebalance
+            // or ExpiryPolicy is set or validation is disabled.
             if (grpCtx == null
                     || grpCtx.config().isReadThrough()
                     || grpCtx.config().isWriteThrough()
                     || grpCtx.config().getCacheStoreFactory() != null
                     || grpCtx.config().getRebalanceDelay() == -1
-                    || grpCtx.config().getRebalanceMode() == 
CacheRebalanceMode.NONE)
+                    || grpCtx.config().getRebalanceMode() == 
CacheRebalanceMode.NONE
+                    || grpCtx.config().getExpiryPolicyFactory() == null
+                    || SKIP_PARTITION_SIZE_VALIDATION)
                 continue;
 
             try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 5feaa25..d7cc623 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -40,6 +40,7 @@ import org.apache.ignite.internal.pagemem.wal.WALIterator;
 import org.apache.ignite.internal.pagemem.wal.WALPointer;
 import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
 import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
+import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageInitRecord;
 import 
org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateNextSnapshotId;
@@ -49,6 +50,8 @@ import 
org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheTtlManager;
 import 
org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
@@ -56,6 +59,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteHistoricalIterator;
 import 
org.apache.ignite.internal.processors.cache.persistence.freelist.CacheFreeListImpl;
+import 
org.apache.ignite.internal.processors.cache.persistence.migration.UpgradePendingTreeToPerPartitionTask;
 import 
org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
 import 
org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
 import 
org.apache.ignite.internal.processors.cache.persistence.partstate.PagesAllocationRange;
@@ -64,6 +68,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.PageMetaIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionCountersIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIOV2;
 import 
org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
 import 
org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseListImpl;
 import 
org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler;
@@ -71,10 +76,13 @@ import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointe
 import org.apache.ignite.internal.processors.cache.tree.CacheDataRowStore;
 import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
 import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree;
+import org.apache.ignite.internal.processors.cache.tree.PendingRow;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner;
 import org.apache.ignite.internal.util.lang.GridCursor;
+import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
@@ -95,7 +103,14 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
     private ReuseListImpl reuseList;
 
     /** {@inheritDoc} */
+    @Override protected void initPendingTree(GridCacheContext cctx) throws 
IgniteCheckedException {
+        // No-op. Per-partition PendingTree should be used.
+    }
+
+    /** {@inheritDoc} */
     @Override protected void initDataStructures() throws 
IgniteCheckedException {
+        assert ctx.database().checkpointLockIsHeldByThread();
+
         Metas metas = getOrAllocateCacheMetas();
 
         RootPage reuseListRoot = metas.reuseListRoot;
@@ -122,29 +137,12 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
         
((GridCacheDatabaseSharedManager)ctx.database()).addCheckpointListener(this);
     }
 
-    /** {@inheritDoc} */
-    @Override public void onCacheStarted(GridCacheContext cctx) throws 
IgniteCheckedException {
-        if (cctx.affinityNode() && cctx.ttl().eagerTtlEnabled() && 
pendingEntries == null) {
-            ctx.database().checkpointReadLock();
-
-            try {
-                final String name = "PendingEntries";
-
-                RootPage pendingRootPage = 
indexStorage.getOrAllocateForTree(name);
-
-                pendingEntries = new PendingEntriesTree(
-                    grp,
-                    name,
-                    grp.dataRegion().pageMemory(),
-                    pendingRootPage.pageId().pageId(),
-                    reuseList,
-                    pendingRootPage.isAllocated()
-                );
-            }
-            finally {
-                ctx.database().checkpointReadUnlock();
-            }
-        }
+    /**
+     * Get internal IndexStorage.
+     * See {@link UpgradePendingTreeToPerPartitionTask} for details.
+     */
+    public IndexStorage getIndexStorage() {
+        return indexStorage;
     }
 
     /** {@inheritDoc} */
@@ -218,8 +216,8 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
 
                 int grpId = grp.groupId();
                 long partMetaId = pageMem.partitionMetaPageId(grpId, 
store.partId());
-                long partMetaPage = pageMem.acquirePage(grpId, partMetaId);
 
+                long partMetaPage = pageMem.acquirePage(grpId, partMetaId);
                 try {
                     long partMetaPageAddr = pageMem.writeLock(grpId, 
partMetaId, partMetaPage);
 
@@ -274,7 +272,7 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
                         if (needSnapshot) {
                             pageCnt = this.ctx.pageStore().pages(grpId, 
store.partId());
 
-                            io.setCandidatePageCount(partMetaPageAddr, size == 
0 ? 0: pageCnt);
+                            io.setCandidatePageCount(partMetaPageAddr, size == 
0 ? 0 : pageCnt);
 
                             if (saveMeta) {
                                 saveMeta(ctx);
@@ -285,7 +283,7 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
                             if (state == OWNING) {
                                 assert part != null;
 
-                                if(!addPartition(
+                                if (!addPartition(
                                     part,
                                     ctx.partitionStatMap(),
                                     partMetaPageAddr,
@@ -295,8 +293,8 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
                                     this.ctx.pageStore().pages(grpId, 
store.partId()),
                                     store.fullSize()
                                 ))
-                                    U.warn(log,"Partition was concurrently 
evicted grpId=" +  grpId +
-                                            ", partitionId=" + part.id());
+                                    U.warn(log, "Partition was concurrently 
evicted grpId=" + grpId +
+                                        ", partitionId=" + part.id());
                             }
                             else if (state == MOVING || state == RENTING) {
                                 if 
(ctx.partitionStatMap().forceSkipIndexPartition(grpId)) {
@@ -333,7 +331,7 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
                 }
             }
             else if (needSnapshot)
-                tryAddEmptyPartitionToSnapshot(store, ctx);;
+                tryAddEmptyPartitionToSnapshot(store, ctx);
         }
         else if (needSnapshot)
             tryAddEmptyPartitionToSnapshot(store, ctx);
@@ -350,8 +348,8 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
     private void tryAddEmptyPartitionToSnapshot(CacheDataStore store, Context 
ctx) {
         if (getPartition(store).state() == OWNING) {
             ctx.partitionStatMap().put(
-                    new GroupPartitionId(grp.groupId(), store.partId()),
-                    new PagesAllocationRange(0, 0));
+                new GroupPartitionId(grp.groupId(), store.partId()),
+                new PagesAllocationRange(0, 0));
         }
     }
 
@@ -362,7 +360,7 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
      */
     private GridDhtLocalPartition getPartition(CacheDataStore store) {
         return grp.topology().localPartition(store.partId(),
-                AffinityTopologyVersion.NONE, false, true);
+            AffinityTopologyVersion.NONE, false, true);
     }
 
     /**
@@ -385,7 +383,7 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
 
         long nextId = cntrsPageId;
 
-        while (true){
+        while (true) {
             final long curId = nextId;
             final long curPage = pageMem.acquirePage(grpId, curId);
 
@@ -542,19 +540,19 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
      * @param currAllocatedPageCnt total number of pages allocated for 
partition <code>[partition, grpId]</code>
      */
     private static boolean addPartition(
-            GridDhtLocalPartition part,
-            final PartitionAllocationMap map,
-            final long metaPageAddr,
-            final PageMetaIO io,
-            final int grpId,
-            final int partId,
-            final int currAllocatedPageCnt,
-            final long partSize
+        GridDhtLocalPartition part,
+        final PartitionAllocationMap map,
+        final long metaPageAddr,
+        final PageMetaIO io,
+        final int grpId,
+        final int partId,
+        final int currAllocatedPageCnt,
+        final long partSize
     ) {
         if (part != null) {
             boolean reserved = part.reserve();
 
-            if(!reserved)
+            if (!reserved)
                 return false;
         }
         else
@@ -596,43 +594,6 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
         }
     }
 
-    /**
-     * Destroys given {@code store} and creates new with the same update 
counters as in given.
-     *
-     * @param store Store to destroy.
-     * @return New cache data store.
-     * @throws IgniteCheckedException If failed.
-     */
-    public CacheDataStore recreateCacheDataStore(CacheDataStore store) throws 
IgniteCheckedException {
-        long updCounter = store.updateCounter();
-        long initUpdCounter = store.initialUpdateCounter();
-
-        int p = store.partId();
-
-        PageMemoryEx pageMemory = (PageMemoryEx)grp.dataRegion().pageMemory();
-
-        int tag = pageMemory.invalidate(grp.groupId(), p);
-
-        ctx.pageStore().onPartitionDestroyed(grp.groupId(), p, tag);
-
-        CacheDataStore store0;
-
-        partStoreLock.lock(p);
-
-        try {
-            store0 = createCacheDataStore0(p);
-            store0.updateCounter(updCounter);
-            store0.updateInitialCounter(initUpdCounter);
-
-            partDataStores.put(p, store0);
-        }
-        finally {
-            partStoreLock.unlock(p);
-        }
-
-        return store0;
-    }
-
     /** {@inheritDoc} */
     @Override public void onPartitionCounterUpdated(int part, long cntr) {
         CacheDataStore store = partDataStores.get(part);
@@ -743,7 +704,8 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
 
                 return new Metas(
                     new RootPage(new FullPageId(metastoreRoot, grpId), 
allocated),
-                    new RootPage(new FullPageId(reuseListRoot, grpId), 
allocated));
+                    new RootPage(new FullPageId(reuseListRoot, grpId), 
allocated),
+                    null);
             }
             finally {
                 pageMem.writeUnlock(grpId, metaId, metaPage, null, allocated);
@@ -787,6 +749,47 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
         return iterator;
     }
 
+    /** {@inheritDoc} */
+    @Override public boolean expire(
+        GridCacheContext cctx,
+        IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c,
+        int amount
+    ) throws IgniteCheckedException {
+        assert !cctx.isNear() : cctx.name();
+
+        if (!hasPendingEntries)
+            return false;
+
+        if (!busyLock.enterBusy())
+            return false;
+
+        try {
+            int cleared = 0;
+
+            for (CacheDataStore store : cacheDataStores()) {
+                cleared += ((GridCacheDataStore)store).purgeExpired(cctx, c, 
amount - cleared);
+
+                if (amount != -1 && cleared >= amount)
+                    return true;
+            }
+        }
+        finally {
+            busyLock.leaveBusy();
+        }
+
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long expiredSize() throws IgniteCheckedException {
+        long size = 0;
+
+        for (CacheDataStore store : cacheDataStores())
+            size += ((GridCacheDataStore)store).expiredSize();
+
+        return size;
+    }
+
     /**
      * Calculates free space of all partition data stores - number of bytes 
available for use in allocated pages.
      *
@@ -1098,13 +1101,18 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
         @GridToStringInclude
         private final RootPage treeRoot;
 
+        /** */
+        @GridToStringInclude
+        private final RootPage pendingTreeRoot;
+
         /**
          * @param treeRoot Metadata storage root.
          * @param reuseListRoot Reuse list root.
          */
-        Metas(RootPage treeRoot, RootPage reuseListRoot) {
+        Metas(RootPage treeRoot, RootPage reuseListRoot, RootPage 
pendingTreeRoot) {
             this.treeRoot = treeRoot;
             this.reuseListRoot = reuseListRoot;
+            this.pendingTreeRoot = pendingTreeRoot;
         }
 
         /** {@inheritDoc} */
@@ -1116,7 +1124,7 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
     /**
      *
      */
-    private class GridCacheDataStore implements CacheDataStore {
+    public class GridCacheDataStore implements CacheDataStore {
         /** */
         private final int partId;
 
@@ -1127,6 +1135,9 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
         private volatile CacheFreeListImpl freeList;
 
         /** */
+        private PendingEntriesTree pendingTree;
+
+        /** */
         private volatile CacheDataStore delegate;
 
         /** */
@@ -1164,11 +1175,10 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
                     return null;
             }
 
-            IgniteCacheDatabaseSharedManager dbMgr = ctx.database();
-
-            dbMgr.checkpointReadLock();
-
             if (init.compareAndSet(false, true)) {
+                IgniteCacheDatabaseSharedManager dbMgr = ctx.database();
+
+                dbMgr.checkpointReadLock();
                 try {
                     Metas metas = getOrAllocatePartitionMetas();
 
@@ -1183,6 +1193,7 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
                         ctx.wal(),
                         reuseRoot.pageId().pageId(),
                         reuseRoot.isAllocated()) {
+                        /** {@inheritDoc} */
                         @Override protected long allocatePageNoReuse() throws 
IgniteCheckedException {
                             assert 
grp.shared().database().checkpointLockIsHeldByThread();
 
@@ -1201,6 +1212,24 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
                         rowStore,
                         treeRoot.pageId().pageId(),
                         treeRoot.isAllocated()) {
+                        /** {@inheritDoc} */
+                        @Override protected long allocatePageNoReuse() throws 
IgniteCheckedException {
+                            assert 
grp.shared().database().checkpointLockIsHeldByThread();
+
+                            return pageMem.allocatePage(grpId, partId, 
PageIdAllocator.FLAG_DATA);
+                        }
+                    };
+
+                    RootPage pendingTreeRoot = metas.pendingTreeRoot;
+
+                    final PendingEntriesTree pendingTree0 = new 
PendingEntriesTree(
+                        grp,
+                        "PendingEntries-" + partId,
+                        grp.dataRegion().pageMemory(),
+                        pendingTreeRoot.pageId().pageId(),
+                        reuseList,
+                        pendingTreeRoot.isAllocated()) {
+                        /** {@inheritDoc} */
                         @Override protected long allocatePageNoReuse() throws 
IgniteCheckedException {
                             assert 
grp.shared().database().checkpointLockIsHeldByThread();
 
@@ -1210,7 +1239,17 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
 
                     PageMemoryEx pageMem = 
(PageMemoryEx)grp.dataRegion().pageMemory();
 
-                    delegate0 = new CacheDataStoreImpl(partId, name, rowStore, 
dataTree);
+                    delegate0 = new CacheDataStoreImpl(partId, name, rowStore, 
dataTree) {
+                        /** {@inheritDoc} */
+                        @Override public PendingEntriesTree pendingTree() {
+                            return pendingTree0;
+                        }
+                    };
+
+                    pendingTree = pendingTree0;
+
+                    if (!hasPendingEntries && pendingTree0.size() > 0)
+                        hasPendingEntries = true;
 
                     int grpId = grp.groupId();
                     long partMetaId = pageMem.partitionMetaPageId(grpId, 
partId);
@@ -1258,8 +1297,6 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
                 }
             }
             else {
-                dbMgr.checkpointReadUnlock();
-
                 U.await(latch);
 
                 delegate0 = delegate;
@@ -1280,13 +1317,15 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
 
             int grpId = grp.groupId();
             long partMetaId = pageMem.partitionMetaPageId(grpId, partId);
+
             long partMetaPage = pageMem.acquirePage(grpId, partMetaId);
             try {
                 boolean allocated = false;
-                long pageAddr = pageMem.writeLock(grpId, partMetaId, 
partMetaPage);
+                boolean pendingTreeAllocated = false;
 
+                long pageAddr = pageMem.writeLock(grpId, partMetaId, 
partMetaPage);
                 try {
-                    long treeRoot, reuseListRoot;
+                    long treeRoot, reuseListRoot, pendingTreeRoot;
 
                     // Initialize new page.
                     if (PageIO.getType(pageAddr) != PageIO.T_PART_META) {
@@ -1296,22 +1335,18 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
 
                         treeRoot = pageMem.allocatePage(grpId, partId, 
PageMemory.FLAG_DATA);
                         reuseListRoot = pageMem.allocatePage(grpId, partId, 
PageMemory.FLAG_DATA);
+                        pendingTreeRoot = pageMem.allocatePage(grpId, partId, 
PageMemory.FLAG_DATA);
 
                         assert PageIdUtils.flag(treeRoot) == 
PageMemory.FLAG_DATA;
                         assert PageIdUtils.flag(reuseListRoot) == 
PageMemory.FLAG_DATA;
+                        assert PageIdUtils.flag(pendingTreeRoot) == 
PageMemory.FLAG_DATA;
 
                         io.setTreeRoot(pageAddr, treeRoot);
                         io.setReuseListRoot(pageAddr, reuseListRoot);
+                        io.setPendingTreeRoot(pageAddr, pendingTreeRoot);
 
                         if (PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, 
partMetaId, partMetaPage, wal, null))
-                            wal.log(new MetaPageInitRecord(
-                                grpId,
-                                partMetaId,
-                                io.getType(),
-                                io.getVersion(),
-                                treeRoot,
-                                reuseListRoot
-                            ));
+                            wal.log(new PageSnapshot(new 
FullPageId(partMetaId, grpId), pageAddr, pageMem.pageSize()));
 
                         allocated = true;
                     }
@@ -1321,6 +1356,33 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
                         treeRoot = io.getTreeRoot(pageAddr);
                         reuseListRoot = io.getReuseListRoot(pageAddr);
 
+                        int pageVersion = 
PagePartitionMetaIO.getVersion(pageAddr);
+
+                        if (pageVersion < 2) {
+                            assert pageVersion == 1;
+
+                            if (log.isDebugEnabled())
+                                log.info("Upgrade partition meta page version: 
[part=" + partId +
+                                    ", grpId=" + grpId + ", oldVer=" + 
pageVersion +
+                                    ", newVer=" + io.getVersion()
+                                );
+
+                            io = PagePartitionMetaIO.VERSIONS.latest();
+
+                            ((PagePartitionMetaIOV2)io).upgradePage(pageAddr);
+
+                            pendingTreeRoot = pageMem.allocatePage(grpId, 
partId, PageMemory.FLAG_DATA);
+
+                            io.setPendingTreeRoot(pageAddr, pendingTreeRoot);
+
+                            if (PageHandler.isWalDeltaRecordNeeded(pageMem, 
grpId, partMetaId, partMetaPage, wal, null))
+                                wal.log(new PageSnapshot(new 
FullPageId(partMetaId, grpId), pageAddr, pageMem.pageSize()));
+
+                            pendingTreeAllocated = true;
+                        }
+                        else
+                            pendingTreeRoot = io.getPendingTreeRoot(pageAddr);
+
                         if (PageIdUtils.flag(treeRoot) != PageMemory.FLAG_DATA)
                             throw new StorageException("Wrong tree root page 
id flag: treeRoot="
                                 + U.hexLong(treeRoot) + ", part=" + partId + 
", grpId=" + grpId);
@@ -1328,14 +1390,19 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
                         if (PageIdUtils.flag(reuseListRoot) != 
PageMemory.FLAG_DATA)
                             throw new StorageException("Wrong reuse list root 
page id flag: reuseListRoot="
                                 + U.hexLong(reuseListRoot) + ", part=" + 
partId + ", grpId=" + grpId);
+
+                        if (PageIdUtils.flag(pendingTreeRoot) != 
PageMemory.FLAG_DATA)
+                            throw new StorageException("Wrong pending tree 
root page id flag: reuseListRoot="
+                                + U.hexLong(reuseListRoot) + ", part=" + 
partId + ", grpId=" + grpId);
                     }
 
                     return new Metas(
                         new RootPage(new FullPageId(treeRoot, grpId), 
allocated),
-                        new RootPage(new FullPageId(reuseListRoot, grpId), 
allocated));
+                        new RootPage(new FullPageId(reuseListRoot, grpId), 
allocated),
+                        new RootPage(new FullPageId(pendingTreeRoot, grpId), 
allocated || pendingTreeAllocated));
                 }
                 finally {
-                    pageMem.writeUnlock(grpId, partMetaId, partMetaPage, null, 
allocated);
+                    pageMem.writeUnlock(grpId, partMetaId, partMetaPage, null, 
allocated || pendingTreeAllocated);
                 }
             }
             finally {
@@ -1485,6 +1552,8 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
             long expireTime,
             @Nullable CacheDataRow oldRow
         ) throws IgniteCheckedException {
+            assert ctx.database().checkpointLockIsHeldByThread();
+
             CacheDataStore delegate = init0(false);
 
             delegate.update(cctx, key, val, ver, expireTime, oldRow);
@@ -1498,6 +1567,8 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
             GridCacheVersion ver,
             long expireTime,
             @Nullable CacheDataRow oldRow) throws IgniteCheckedException {
+            assert ctx.database().checkpointLockIsHeldByThread();
+
             CacheDataStore delegate = init0(false);
 
             return delegate.createRow(cctx, key, val, ver, expireTime, oldRow);
@@ -1506,6 +1577,8 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
         /** {@inheritDoc} */
         @Override public void invoke(GridCacheContext cctx, KeyCacheObject 
key, OffheapInvokeClosure c)
             throws IgniteCheckedException {
+            assert ctx.database().checkpointLockIsHeldByThread();
+
             CacheDataStore delegate = init0(false);
 
             delegate.invoke(cctx, key, c);
@@ -1514,6 +1587,8 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
         /** {@inheritDoc} */
         @Override public void remove(GridCacheContext cctx, KeyCacheObject 
key, int partId)
             throws IgniteCheckedException {
+            assert ctx.database().checkpointLockIsHeldByThread();
+
             CacheDataStore delegate = init0(false);
 
             delegate.remove(cctx, key, partId);
@@ -1583,10 +1658,142 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
 
         /** {@inheritDoc} */
         @Override public void clear(int cacheId) throws IgniteCheckedException 
{
-            CacheDataStore delegate = init0(true);
+            CacheDataStore delegate0 = init0(true);
 
-            if (delegate != null)
-                delegate.clear(cacheId);
+            if (delegate0 == null)
+                return;
+
+            ctx.database().checkpointReadLock();
+            try {
+                // Clear persistent pendingTree
+                if (pendingTree != null) {
+                    PendingRow row = new PendingRow(cacheId);
+
+                    GridCursor<PendingRow> cursor = pendingTree.find(row, row, 
PendingEntriesTree.WITHOUT_KEY);
+
+                    while (cursor.next()) {
+                        PendingRow row0 = cursor.get();
+
+                        assert row0.link != 0 : row;
+
+                        boolean res = pendingTree.removex(row0);
+
+                        assert res;
+                    }
+                }
+
+                delegate0.clear(cacheId);
+            }
+            finally {
+                ctx.database().checkpointReadUnlock();
+            }
+        }
+
+        /**
+         * Gets the number of entries pending expire.
+         *
+         * @return Number of pending entries.
+         * @throws IgniteCheckedException If failed to get number of pending 
entries.
+         */
+        public long expiredSize() throws IgniteCheckedException {
+            CacheDataStore delegate0 = init0(true);
+
+            return delegate0 == null ? 0 : pendingTree.size();
+        }
+
+        /**
+         * Removes expired entries from data store.
+         *
+         * @param cctx Cache context.
+         * @param c Expiry closure that should be applied to expired entry. 
See {@link GridCacheTtlManager} for details.
+         * @param amount Limit of processed entries by single call, {@code -1} 
for no limit.
+         * @return {@code True} if unprocessed expired entries remains.
+         * @throws IgniteCheckedException If failed.
+         */
+        public int purgeExpired(GridCacheContext cctx,
+            IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c,
+            int amount) throws IgniteCheckedException {
+            CacheDataStore delegate0 = init0(true);
+
+            if (delegate0 == null || pendingTree == null)
+                return 0;
+
+            GridDhtLocalPartition part = 
cctx.topology().localPartition(partId, AffinityTopologyVersion.NONE, false, 
false);
+
+            // Skip non-owned partitions.
+            if (part == null || part.state() != OWNING || pendingTree.size() 
== 0)
+                return 0;
+
+            cctx.shared().database().checkpointReadLock();
+            try {
+                if (!part.reserve())
+                    return 0;
+
+                try {
+                    if (part.state() != OWNING)
+                        return 0;
+
+                    long now = U.currentTimeMillis();
+
+                    GridCursor<PendingRow> cur;
+
+                    if (grp.sharedGroup())
+                        cur = pendingTree.find(new PendingRow(cctx.cacheId()), 
new PendingRow(cctx.cacheId(), now, 0));
+                    else
+                        cur = pendingTree.find(null, new 
PendingRow(CU.UNDEFINED_CACHE_ID, now, 0));
+
+                    if (!cur.next())
+                        return 0;
+
+                    GridCacheVersion obsoleteVer = null;
+
+                    int cleared = 0;
+
+                    do {
+                        PendingRow row = cur.get();
+
+                        if (amount != -1 && cleared > amount)
+                            return cleared;
+
+                        assert row.key != null && row.link != 0 && 
row.expireTime != 0 : row;
+
+                        row.key.partition(partId);
+
+                        if (pendingTree.removex(row)) {
+                            if (obsoleteVer == null)
+                                obsoleteVer = ctx.versions().next();
+
+                            GridCacheEntryEx e1 = 
cctx.cache().entryEx(row.key);
+
+                            if (e1 != null)
+                                c.apply(e1, obsoleteVer);
+                        }
+
+                        cleared++;
+                    }
+                    while (cur.next());
+
+                    return cleared;
+                }
+                finally {
+                    part.release();
+                }
+            }
+            finally {
+                cctx.shared().database().checkpointReadUnlock();
+            }
+        }
+
+        /** {@inheritDoc} */
+        @Override public PendingEntriesTree pendingTree() {
+            try {
+                CacheDataStore delegate0 = init0(true);
+
+                return delegate0 == null ? null : pendingTree;
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
         }
     }
 

Reply via email to