IGNITE-5874 Store TTL expire times in B+ tree on per-partition basis - Fixes #3231.
Signed-off-by: Ivan Rakov <ira...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/89c77573 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/89c77573 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/89c77573 Branch: refs/heads/master Commit: 89c775737936645eaf739b494cc1740cd9605095 Parents: 01f6054 Author: Andrey V. Mashenkov <andrey.mashen...@gmail.com> Authored: Fri May 11 18:45:38 2018 +0300 Committer: Ivan Rakov <ira...@apache.org> Committed: Fri May 11 18:45:38 2018 +0300 ---------------------------------------------------------------------- .../JettyRestProcessorAbstractSelfTest.java | 21 - .../PdsWithTtlCompatibilityTest.java | 191 +++++++++ .../IgniteCompatibilityBasicTestSuite.java | 3 + .../apache/ignite/IgniteSystemProperties.java | 11 + .../delta/MetaPageUpdateLastAllocatedIndex.java | 6 +- .../processors/cache/CacheGroupContext.java | 16 +- .../processors/cache/GridCacheMapEntry.java | 41 +- .../cache/IgniteCacheOffheapManager.java | 21 +- .../cache/IgniteCacheOffheapManagerImpl.java | 128 +++--- .../distributed/dht/GridDhtLocalPartition.java | 156 +++---- .../dht/GridDhtPartitionsStateValidator.java | 1 + .../GridDhtPartitionsExchangeFuture.java | 14 +- .../persistence/GridCacheOffheapManager.java | 415 ++++++++++++++----- .../UpgradePendingTreeToPerPartitionTask.java | 380 +++++++++++++++++ .../cache/persistence/tree/io/PageIO.java | 11 +- .../tree/io/PagePartitionMetaIO.java | 35 +- .../tree/io/PagePartitionMetaIOV2.java | 90 ++++ ...idCachePartitionsStateValidatorSelfTest.java | 10 +- .../IgnitePdsContinuousRestartTest.java | 89 +++- .../IgnitePdsContinuousRestartTest2.java | 281 ------------- ...dsContinuousRestartTestWithExpiryPolicy.java | 67 +++ .../IgniteBaselineAbstractFullApiSelfTest.java | 9 +- .../persistence/db/IgnitePdsWithTtlTest.java | 197 +++++++++ .../ignite/testsuites/IgnitePdsTestSuite.java | 2 + .../ignite/testsuites/IgnitePdsTestSuite2.java | 5 +- 25 files changed, 1582 insertions(+), 618 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java index 0285f3a..96391e9 100644 --- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java +++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/JettyRestProcessorAbstractSelfTest.java @@ -978,27 +978,10 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro assertCacheOperation(ret, true); } - /** */ - private void failIgnite_5874() { - DataStorageConfiguration dsCfg = ignite(0).configuration().getDataStorageConfiguration(); - - if (dsCfg.getDefaultDataRegionConfiguration().isPersistenceEnabled()) - fail("IGNITE-5874"); - - if (!F.isEmpty(dsCfg.getDataRegionConfigurations())) { - for (DataRegionConfiguration dataRegCfg : dsCfg.getDataRegionConfigurations()) { - if (dataRegCfg.isPersistenceEnabled()) - fail("IGNITE-5874"); - } - } - } - /** * @throws Exception If failed. */ public void testPutWithExpiration() throws Exception { - failIgnite_5874(); - String ret = content(DEFAULT_CACHE_NAME, GridRestCommand.CACHE_PUT, "key", "putKey", "val", "putVal", @@ -1035,8 +1018,6 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro * @throws Exception If failed. */ public void testAddWithExpiration() throws Exception { - failIgnite_5874(); - String ret = content(DEFAULT_CACHE_NAME, GridRestCommand.CACHE_ADD, "key", "addKey", "val", "addVal", @@ -1176,8 +1157,6 @@ public abstract class JettyRestProcessorAbstractSelfTest extends AbstractRestPro * @throws Exception If failed. */ public void testReplaceWithExpiration() throws Exception { - failIgnite_5874(); - jcache().put("replaceKey", "replaceVal"); assertEquals("replaceVal", jcache().get("replaceKey")); http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/compatibility/src/test/java/org/apache/ignite/compatibility/PdsWithTtlCompatibilityTest.java ---------------------------------------------------------------------- diff --git a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/PdsWithTtlCompatibilityTest.java b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/PdsWithTtlCompatibilityTest.java new file mode 100644 index 0000000..f3649f6 --- /dev/null +++ b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/PdsWithTtlCompatibilityTest.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.compatibility; + +import java.util.Collection; +import java.util.concurrent.TimeUnit; +import javax.cache.Cache; +import javax.cache.expiry.AccessedExpiryPolicy; +import javax.cache.expiry.Duration; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.compatibility.persistence.IgnitePersistenceCompatibilityAbstractTest; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.PersistentStoreConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.processors.cache.GridCacheAbstractFullApiSelfTest; +import org.apache.ignite.internal.processors.cache.persistence.migration.UpgradePendingTreeToPerPartitionTask; +import org.apache.ignite.internal.util.typedef.PA; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.testframework.GridTestUtils; + +/** + * Test PendingTree upgrading to per-partition basis. Test fill cache with persistence enabled and with ExpirePolicy + * configured on ignite-2.1 version and check if entries will be correctly expired when a new version node started. + * + * Note: Test for ignite-2.3 version will always fails due to entry ttl update fails with assertion on checkpoint lock + * check. + */ +public class PdsWithTtlCompatibilityTest extends IgnitePersistenceCompatibilityAbstractTest { + /** */ + static final String TEST_CACHE_NAME = PdsWithTtlCompatibilityTest.class.getSimpleName(); + + /** */ + static final int DURATION_SEC = 10; + + /** */ + private static final int ENTRIES_CNT = 100; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setPeerClassLoadingEnabled(false); + + cfg.setDataStorageConfiguration( + new DataStorageConfiguration() + .setDefaultDataRegionConfiguration( + new DataRegionConfiguration() + .setMaxSize(32L * 1024 * 1024) + .setPersistenceEnabled(true) + ).setWalMode(WALMode.LOG_ONLY)); + + return cfg; + } + + /** + * Tests opportunity to read data from previous Ignite DB version. + * + * @throws Exception If failed. + */ + public void testNodeStartByOldVersionPersistenceData_2_1() throws Exception { + doTestStartupWithOldVersion("2.1.0"); + } + + /** + * Tests opportunity to read data from previous Ignite DB version. + * + * @param igniteVer 3-digits version of ignite + * @throws Exception If failed. + */ + protected void doTestStartupWithOldVersion(String igniteVer) throws Exception { + try { + startGrid(1, igniteVer, new ConfigurationClosure(), new PostStartupClosure()); + + stopAllGrids(); + + IgniteEx ignite = startGrid(0); + + assertEquals(1, ignite.context().discovery().topologyVersion()); + + ignite.active(true); + + validateResultingCacheData(ignite, ignite.cache(TEST_CACHE_NAME)); + } + finally { + stopAllGrids(); + } + } + + /** + * @param cache to be filled by different keys and values. Results may be validated in {@link + * #validateResultingCacheData(Ignite, IgniteCache)}. + */ + public static void saveCacheData(Cache<Object, Object> cache) { + for (int i = 0; i < ENTRIES_CNT; i++) + cache.put(i, "data-" + i); + + //Touch + for (int i = 0; i < ENTRIES_CNT; i++) + assertNotNull(cache.get(i)); + } + + /** + * Asserts cache contained all expected values as it was saved before. + * + * @param cache cache should be filled using {@link #saveCacheData(Cache)}. + */ + public static void validateResultingCacheData(Ignite ignite, + IgniteCache<Object, Object> cache) throws IgniteInterruptedCheckedException { + + final long expireTime = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(DURATION_SEC + 1); + + final IgniteFuture<Collection<Boolean>> future = ignite.compute().broadcastAsync(new UpgradePendingTreeToPerPartitionTask()); + + GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return future.isDone() && expireTime < System.currentTimeMillis(); + } + }, TimeUnit.SECONDS.toMillis(DURATION_SEC + 2)); + + for (Boolean res : future.get()) + assertTrue(res); + + for (int i = 0; i < ENTRIES_CNT; i++) + assertNull(cache.get(i)); + } + + /** */ + public static class ConfigurationClosure implements IgniteInClosure<IgniteConfiguration> { + /** {@inheritDoc} */ + @Override public void apply(IgniteConfiguration cfg) { + cfg.setLocalHost("127.0.0.1"); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + disco.setIpFinder(GridCacheAbstractFullApiSelfTest.LOCAL_IP_FINDER); + + cfg.setDiscoverySpi(disco); + + cfg.setPeerClassLoadingEnabled(false); + + cfg.setPersistentStoreConfiguration(new PersistentStoreConfiguration().setWalMode(WALMode.LOG_ONLY)); + } + } + + /** */ + public static class PostStartupClosure implements IgniteInClosure<Ignite> { + /** {@inheritDoc} */ + @Override public void apply(Ignite ignite) { + ignite.active(true); + + CacheConfiguration<Object, Object> cacheCfg = new CacheConfiguration<>(); + cacheCfg.setName(TEST_CACHE_NAME); + cacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + cacheCfg.setBackups(1); + cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + cacheCfg.setExpiryPolicyFactory(AccessedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, DURATION_SEC))); + cacheCfg.setEagerTtl(true); + cacheCfg.setGroupName("myGroup"); + + IgniteCache<Object, Object> cache = ignite.createCache(cacheCfg); + + saveCacheData(cache); + + ignite.active(false); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testsuites/IgniteCompatibilityBasicTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testsuites/IgniteCompatibilityBasicTestSuite.java b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testsuites/IgniteCompatibilityBasicTestSuite.java index b526137..f6dd736 100644 --- a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testsuites/IgniteCompatibilityBasicTestSuite.java +++ b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/testsuites/IgniteCompatibilityBasicTestSuite.java @@ -18,6 +18,7 @@ package org.apache.ignite.compatibility.testsuites; import junit.framework.TestSuite; +import org.apache.ignite.compatibility.PdsWithTtlCompatibilityTest; import org.apache.ignite.compatibility.persistence.FoldersReuseCompatibilityTest; import org.apache.ignite.compatibility.persistence.MigratingToWalV2SerializerWithCompactionTest; import org.apache.ignite.compatibility.persistence.PersistenceBasicCompatibilityTest; @@ -35,6 +36,8 @@ public class IgniteCompatibilityBasicTestSuite { suite.addTestSuite(PersistenceBasicCompatibilityTest.class); + suite.addTestSuite(PdsWithTtlCompatibilityTest.class); + suite.addTestSuite(FoldersReuseCompatibilityTest.class); suite.addTestSuite(MigratingToWalV2SerializerWithCompactionTest.class); http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index 008974c..727e809 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -888,6 +888,17 @@ public final class IgniteSystemProperties { public static final String IGNITE_DISABLE_WAL_DURING_REBALANCING = "IGNITE_DISABLE_WAL_DURING_REBALANCING"; /** + * When set to {@code true}, Ignite will skip partitions sizes check on partition validation after rebalance has finished. + * Partitions sizes may differs on nodes when Expiry Policy is in use and it is ok due to lazy entry eviction mechanics. + * + * There is no need to disable partition size validation either in normal case or when expiry policy is configured for cache. + * But it should be disabled manually when policy is used on per entry basis to hint Ignite to skip this check. + * + * Default is {@code false}. + */ + public static final String IGNITE_SKIP_PARTITION_SIZE_VALIDATION = "IGNITE_SKIP_PARTITION_SIZE_VALIDATION"; + + /** * Enforces singleton. */ private IgniteSystemProperties() { http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdateLastAllocatedIndex.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdateLastAllocatedIndex.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdateLastAllocatedIndex.java index 39f6a03..324227b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdateLastAllocatedIndex.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/delta/MetaPageUpdateLastAllocatedIndex.java @@ -41,9 +41,11 @@ public class MetaPageUpdateLastAllocatedIndex extends PageDeltaRecord { /** {@inheritDoc} */ @Override public void applyDelta(PageMemory pageMem, long pageAddr) throws IgniteCheckedException { - assert PageIO.getType(pageAddr) == PageIO.T_META || PageIO.getType(pageAddr) == PageIO.T_PART_META; + int type = PageIO.getType(pageAddr); - PageMetaIO io = PageMetaIO.VERSIONS.forVersion(PageIO.getVersion(pageAddr)); + assert type == PageIO.T_META || type == PageIO.T_PART_META; + + PageMetaIO io = PageIO.getPageIO(type, PageIO.getVersion(pageAddr)); io.setLastAllocatedPageCount(pageAddr, lastAllocatedIdx); } http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java index 5f750d5..d1bdbb6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java @@ -39,9 +39,9 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentRequest; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse; -import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsEvictor; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopologyImpl; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionsEvictor; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; import org.apache.ignite.internal.processors.cache.persistence.DataRegion; import org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager; @@ -313,7 +313,7 @@ public class CacheGroupContext { drEnabled = true; this.caches = caches; - } + } /** * @param cctx Cache context. @@ -372,8 +372,8 @@ public class CacheGroupContext { List<GridCacheContext> caches = this.caches; assert !sharedGroup() && caches.size() == 1 : - "stopping=" + ctx.kernalContext().isStopping() + ", groupName=" + ccfg.getGroupName() + - ", caches=" + caches; + "stopping=" + ctx.kernalContext().isStopping() + ", groupName=" + ccfg.getGroupName() + + ", caches=" + caches; return caches.get(0); } @@ -434,6 +434,7 @@ public class CacheGroupContext { } } } + /** * Adds partition unload event. * @@ -514,13 +515,6 @@ public class CacheGroupContext { } /** - * @return {@code True} if fast eviction is allowed. - */ - public boolean allowFastEviction() { - return persistenceEnabled() && !queriesEnabled(); - } - - /** * @return {@code True} in case replication is enabled. */ public boolean isDrEnabled() { http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 9f3686a..767c314 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -34,6 +34,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.eviction.EvictableEntry; +import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.pagemem.wal.StorageException; import org.apache.ignite.internal.pagemem.wal.WALPointer; import org.apache.ignite.internal.pagemem.wal.record.DataEntry; @@ -401,7 +402,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme checkObsolete(); if (isStartVersion() && ((flags & IS_UNSWAPPED_MASK) == 0)) { - assert row == null || row.key() == key: "Unexpected row key"; + assert row == null || row.key() == key : "Unexpected row key"; CacheDataRow read = row == null ? cctx.offheap().read(this) : row; @@ -1411,7 +1412,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (readThrough && needVal && old == null && (cctx.readThrough() && (op == GridCacheOperation.TRANSFORM || cctx.loadPreviousValue()))) { - old0 = readThrough(null, key, false, subjId, taskName); + old0 = readThrough(null, key, false, subjId, taskName); old = cctx.toCacheObject(old0); @@ -2462,7 +2463,14 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme ttlAndExpireTimeExtras(ttl, expireTime); - storeValue(val, expireTime, ver, null); + cctx.shared().database().checkpointReadLock(); + + try { + storeValue(val, expireTime, ver, null); + } + finally { + cctx.shared().database().checkpointReadUnlock(); + } } /** @@ -3108,7 +3116,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme GridCacheMvcc mvcc = mvccExtras(); return mvcc != null && mvcc.isLocallyOwnedByIdOrThread(lockVer, threadId); - } finally { + } + finally { unlockEntry(); } } @@ -3347,6 +3356,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme obsolete = true; } } + catch (NodeStoppingException ignore) { + if (log.isDebugEnabled()) + log.warning("Node is stopping while removing expired value.", ignore); + } catch (IgniteCheckedException e) { U.error(log, "Failed to clean up expired cache entry: " + this, e); } @@ -3406,7 +3419,14 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (log.isTraceEnabled()) log.trace("onExpired clear [key=" + key + ", entry=" + System.identityHashCode(this) + ']'); - removeValue(); + cctx.shared().database().checkpointReadLock(); + + try { + removeValue(); + } + finally { + cctx.shared().database().checkpointReadUnlock(); + } if (cctx.events().isRecordable(EVT_CACHE_OBJECT_EXPIRED)) { cctx.events().addEvent(partition(), @@ -3586,8 +3606,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme * @param ver New entry version. * @param oldRow Old row if available. * @param predicate Optional predicate. - * @throws IgniteCheckedException If update failed. + * * @return {@code True} if storage was modified. + * @throws IgniteCheckedException If update failed. */ protected boolean storeValue( @Nullable CacheObject val, @@ -3599,7 +3620,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme UpdateClosure closure = new UpdateClosure(this, val, ver, expireTime, predicate); - cctx.offheap().invoke(cctx, key, localPartition(), closure); + cctx.offheap().invoke(cctx, key, localPartition(), closure); return closure.treeOp != IgniteTree.OperationType.NOOP; } @@ -4051,7 +4072,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } /** - * Increments public size of map. + * Increments public size of map. */ protected void incrementMapPublicSize() { GridDhtLocalPartition locPart = localPartition(); @@ -4782,7 +4803,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme needUpdate = true; } - else if (updateExpireTime && expiryPlc != null && entry.val != null){ + else if (updateExpireTime && expiryPlc != null && entry.val != null) { long ttl = expiryPlc.forAccess(); if (ttl != CU.TTL_NOT_CHANGED) { @@ -4929,7 +4950,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (entry.val == null) { boolean new0 = entry.isStartVersion(); - assert entry.deletedUnlocked() || new0 || entry.isInternal(): "Invalid entry [entry=" + entry + + assert entry.deletedUnlocked() || new0 || entry.isInternal() : "Invalid entry [entry=" + entry + ", locNodeId=" + cctx.localNodeId() + ']'; if (!new0 && !entry.isInternal()) http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java index a12c033..fa25412 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java @@ -21,12 +21,13 @@ import java.util.Map; import javax.cache.Cache; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.persistence.RootPage; import org.apache.ignite.internal.processors.cache.persistence.RowStore; import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList; +import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner; import org.apache.ignite.internal.util.GridAtomicLong; @@ -47,7 +48,7 @@ public interface IgniteCacheOffheapManager { * @param grp Cache group. * @throws IgniteCheckedException If failed. */ - public void start(GridCacheSharedContext ctx, CacheGroupContext grp) throws IgniteCheckedException;; + public void start(GridCacheSharedContext ctx, CacheGroupContext grp) throws IgniteCheckedException; /** * @param cctx Cache context. @@ -142,6 +143,8 @@ public interface IgniteCacheOffheapManager { /** * @param cctx Cache context. * @param c Closure. + * @param amount Limit of processed entries by single call, {@code -1} for no limit. + * @return {@code True} if unprocessed expired entries remains. * @throws IgniteCheckedException If failed. */ public boolean expire(GridCacheContext cctx, IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c, int amount) @@ -167,9 +170,9 @@ public interface IgniteCacheOffheapManager { /** * @param cctx Cache context. - * @param key Key. - * @param val Value. - * @param ver Version. + * @param key Key. + * @param val Value. + * @param ver Version. * @param expireTime Expire time. * @param oldRow Old row if available. * @param part Partition. @@ -537,5 +540,13 @@ public interface IgniteCacheOffheapManager { * @param rowCacheCleaner Rows cache cleaner. */ public void setRowCacheCleaner(GridQueryRowCacheCleaner rowCacheCleaner); + + /** + * Return PendingTree for data store. + * + * @return PendingTree instance. + * @throws IgniteCheckedException + */ + PendingEntriesTree pendingTree(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index 5c78eb5..bf0de02 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -101,16 +101,16 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager protected final ConcurrentMap<Integer, CacheDataStore> partDataStores = new ConcurrentHashMap<>(); /** */ - protected PendingEntriesTree pendingEntries; + private PendingEntriesTree pendingEntries; /** */ - private volatile boolean hasPendingEntries; + protected volatile boolean hasPendingEntries; /** */ private final GridAtomicLong globalRmvId = new GridAtomicLong(U.currentTimeMillis() * 1000_000); /** */ - private final GridSpinBusyLock busyLock = new GridSpinBusyLock(); + protected final GridSpinBusyLock busyLock = new GridSpinBusyLock(); /** */ private int updateValSizeThreshold; @@ -148,19 +148,29 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager /** {@inheritDoc} */ public void onCacheStarted(GridCacheContext cctx) throws IgniteCheckedException { + initPendingTree(cctx); + } + + /** + * @param cctx Cache context. + * @throws IgniteCheckedException If failed. + */ + protected void initPendingTree(GridCacheContext cctx) throws IgniteCheckedException { + assert !cctx.group().persistenceEnabled(); + if (cctx.affinityNode() && cctx.ttl().eagerTtlEnabled() && pendingEntries == null) { String name = "PendingEntries"; - long rootPage = allocateForTree(); + long rootPage = allocateForTree(); - pendingEntries = new PendingEntriesTree( - grp, - name, - grp.dataRegion().pageMemory(), - rootPage, - grp.reuseList(), - true); - } + pendingEntries = new PendingEntriesTree( + grp, + name, + grp.dataRegion().pageMemory(), + rootPage, + grp.reuseList(), + true); + } } /** @@ -204,11 +214,11 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager try { if (grp.sharedGroup()) { assert cacheId != CU.UNDEFINED_CACHE_ID; - assert ctx.database().checkpointLockIsHeldByThread(); for (CacheDataStore store : cacheDataStores()) store.clear(cacheId); + // Clear non-persistent pending tree if needed. if (pendingEntries != null) { PendingRow row = new PendingRow(cacheId); @@ -241,6 +251,14 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } } + /** + * @param part Partition. + * @return Data store for given entry. + */ + public CacheDataStore dataStore(int part) { + return grp.isLocal() ? locCacheDataStore : partDataStores.get(part); + } + /** {@inheritDoc} */ @Override public long cacheEntriesCount(int cacheId) { long size = 0; @@ -1011,51 +1029,56 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager ) throws IgniteCheckedException { assert !cctx.isNear() : cctx.name(); - if (hasPendingEntries && pendingEntries != null) { - GridCacheVersion obsoleteVer = null; + if (!hasPendingEntries || pendingEntries == null) + return false; - long now = U.currentTimeMillis(); + GridCacheVersion obsoleteVer = null; - GridCursor<PendingRow> cur; + long now = U.currentTimeMillis(); - if (grp.sharedGroup()) - cur = pendingEntries.find(new PendingRow(cctx.cacheId()), new PendingRow(cctx.cacheId(), now, 0)); - else - cur = pendingEntries.find(null, new PendingRow(CU.UNDEFINED_CACHE_ID, now, 0)); + GridCursor<PendingRow> cur; - if (!cur.next()) - return false; + if (grp.sharedGroup()) + cur = pendingEntries.find(new PendingRow(cctx.cacheId()), new PendingRow(cctx.cacheId(), now, 0)); + else + cur = pendingEntries.find(null, new PendingRow(CU.UNDEFINED_CACHE_ID, now, 0)); - int cleared = 0; + if (!cur.next()) + return false; - cctx.shared().database().checkpointReadLock(); + int cleared = 0; - try { - do { - PendingRow row = cur.get(); + if (!busyLock.enterBusy()) + return false; - if (amount != -1 && cleared > amount) - return true; + try { + do { + if (amount != -1 && cleared > amount) + return true; - if (row.key.partition() == -1) - row.key.partition(cctx.affinity().partition(row.key)); + PendingRow row = cur.get(); - assert row.key != null && row.link != 0 && row.expireTime != 0 : row; + if (row.key.partition() == -1) + row.key.partition(cctx.affinity().partition(row.key)); - if (pendingEntries.removex(row)) { - if (obsoleteVer == null) - obsoleteVer = ctx.versions().next(); + assert row.key != null && row.link != 0 && row.expireTime != 0 : row; - c.apply(cctx.cache().entryEx(row.key), obsoleteVer); - } + if (pendingEntries.removex(row)) { + if (obsoleteVer == null) + obsoleteVer = ctx.versions().next(); + + GridCacheEntryEx entry = cctx.cache().entryEx(row.key); - cleared++; + if (entry != null) + c.apply(entry, obsoleteVer); } - while (cur.next()); - } - finally { - cctx.shared().database().checkpointReadUnlock(); + + cleared++; } + while (cur.next()); + } + finally { + busyLock.leaveBusy(); } return false; @@ -1395,15 +1418,15 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager if (oldRow != null) { assert oldRow.link() != 0 : oldRow; - if (pendingEntries != null && oldRow.expireTime() != 0) - pendingEntries.removex(new PendingRow(cacheId, oldRow.expireTime(), oldRow.link())); + if (pendingTree() != null && oldRow.expireTime() != 0) + pendingTree().removex(new PendingRow(cacheId, oldRow.expireTime(), oldRow.link())); if (newRow.link() != oldRow.link()) rowStore.removeRow(oldRow.link()); } - if (pendingEntries != null && expireTime != 0) { - pendingEntries.putx(new PendingRow(cacheId, expireTime, newRow.link())); + if (pendingTree() != null && expireTime != 0) { + pendingTree().putx(new PendingRow(cacheId, expireTime, newRow.link())); hasPendingEntries = true; } @@ -1444,8 +1467,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager assert cacheId == CU.UNDEFINED_CACHE_ID || oldRow.cacheId() == cacheId : "Incorrect cache ID [expected=" + cacheId + ", actual=" + oldRow.cacheId() + "]."; - if (pendingEntries != null && oldRow.expireTime() != 0) - pendingEntries.removex(new PendingRow(cacheId, oldRow.expireTime(), oldRow.link())); + if (pendingTree() != null && oldRow.expireTime() != 0) + pendingTree().removex(new PendingRow(cacheId, oldRow.expireTime(), oldRow.link())); decrementSize(cctx.cacheId()); } @@ -1543,7 +1566,6 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager /** {@inheritDoc} */ @Override public void clear(int cacheId) throws IgniteCheckedException { assert cacheId != CU.UNDEFINED_CACHE_ID; - assert ctx.database().checkpointLockIsHeldByThread(); if (cacheSize(cacheId) == 0) return; @@ -1624,6 +1646,11 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } } + /** {@inheritDoc} */ + @Override public PendingEntriesTree pendingTree() { + return pendingEntries; + } + /** * @param cctx Cache context. * @param key Key. @@ -1676,5 +1703,4 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager return 0; } } - } http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index be74eff..a199f6c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -47,7 +47,6 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; -import org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -57,9 +56,9 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.util.deque.FastSizeDeque; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import org.apache.ignite.util.deque.FastSizeDeque; import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE; import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_REMOVED_ENTRIES_TTL; @@ -342,9 +341,6 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements * @return {@code True} if partition is empty. */ public boolean isEmpty() { - if (grp.allowFastEviction()) - return internalSize() == 0; - return store.fullSize() == 0 && internalSize() == 0; } @@ -981,78 +977,76 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements long cleared = 0; - if (!grp.allowFastEviction()) { - CacheMapHolder hld = grp.sharedGroup() ? null : singleCacheEntryMap; + CacheMapHolder hld = grp.sharedGroup() ? null : singleCacheEntryMap; - try { - GridIterator<CacheDataRow> it0 = grp.offheap().partitionIterator(id); + try { + GridIterator<CacheDataRow> it0 = grp.offheap().partitionIterator(id); - while (it0.hasNext()) { - ctx.database().checkpointReadLock(); + while (it0.hasNext()) { + ctx.database().checkpointReadLock(); - try { - CacheDataRow row = it0.next(); - - // Do not clear fresh rows in case of single partition clearing. - if (row.version().compareTo(clearVer) >= 0 && (state() == MOVING && clear)) - continue; - - if (grp.sharedGroup() && (hld == null || hld.cctx.cacheId() != row.cacheId())) - hld = cacheMapHolder(ctx.cacheContext(row.cacheId())); - - assert hld != null; - - GridCacheMapEntry cached = putEntryIfObsoleteOrAbsent( - hld, - hld.cctx, - grp.affinity().lastVersion(), - row.key(), - true, - false); - - if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) { - removeEntry(cached); - - if (rec && !hld.cctx.config().isEventsDisabled()) { - hld.cctx.events().addEvent(cached.partition(), - cached.key(), - ctx.localNodeId(), - (IgniteUuid)null, - null, - EVT_CACHE_REBALANCE_OBJECT_UNLOADED, - null, - false, - cached.rawGet(), - cached.hasValue(), - null, - null, - null, - false); - } - - cleared++; + try { + CacheDataRow row = it0.next(); + + // Do not clear fresh rows in case of single partition clearing. + if (row.version().compareTo(clearVer) >= 0 && (state() == MOVING && clear)) + continue; + + if (grp.sharedGroup() && (hld == null || hld.cctx.cacheId() != row.cacheId())) + hld = cacheMapHolder(ctx.cacheContext(row.cacheId())); + + assert hld != null; + + GridCacheMapEntry cached = putEntryIfObsoleteOrAbsent( + hld, + hld.cctx, + grp.affinity().lastVersion(), + row.key(), + true, + false); + + if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) { + removeEntry(cached); + + if (rec && !hld.cctx.config().isEventsDisabled()) { + hld.cctx.events().addEvent(cached.partition(), + cached.key(), + ctx.localNodeId(), + (IgniteUuid)null, + null, + EVT_CACHE_REBALANCE_OBJECT_UNLOADED, + null, + false, + cached.rawGet(), + cached.hasValue(), + null, + null, + null, + false); } - } - catch (GridDhtInvalidPartitionException e) { - assert isEmpty() && state() == EVICTED : "Invalid error [e=" + e + ", part=" + this + ']'; - break; // Partition is already concurrently cleared and evicted. - } - finally { - ctx.database().checkpointReadUnlock(); + cleared++; } } - } - catch (NodeStoppingException e) { - if (log.isDebugEnabled()) - log.debug("Failed to get iterator for evicted partition: " + id); + catch (GridDhtInvalidPartitionException e) { + assert isEmpty() && state() == EVICTED : "Invalid error [e=" + e + ", part=" + this + ']'; - throw e; - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to get iterator for evicted partition: " + id, e); + break; // Partition is already concurrently cleared and evicted. + } + finally { + ctx.database().checkpointReadUnlock(); + } } } + catch (NodeStoppingException e) { + if (log.isDebugEnabled()) + log.debug("Failed to get iterator for evicted partition: " + id); + + throw e; + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to get iterator for evicted partition: " + id, e); + } return cleared; } @@ -1406,37 +1400,9 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } /** - * Recreate cache data store after successful clearing and allowed fast eviction. - */ - private void recreateCacheDataStore() { - assert grp.offheap() instanceof GridCacheOffheapManager; - - try { - CacheDataStore store0 = store; - - store = ((GridCacheOffheapManager) grp.offheap()).recreateCacheDataStore(store0); - - // Inject row cache cleaner on store creation - // Used in case the cache with enabled SqlOnheapCache is single cache at the cache group - if (ctx.kernalContext().query().moduleEnabled()) { - GridQueryRowCacheCleaner cleaner = ctx.kernalContext().query().getIndexing() - .rowCacheCleaner(grp.groupId()); - - if (store != null && cleaner != null) - store.setRowCacheCleaner(cleaner); - } - } catch (IgniteCheckedException e) { - finish(e); - } - } - - /** * Successfully finishes the future. */ public void finish() { - if (state() == MOVING && clear && grp.allowFastEviction()) - recreateCacheDataStore(); - synchronized (this) { onDone(); finished = true; http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java index cc0542c..866c513 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsStateValidator.java @@ -95,6 +95,7 @@ public class GridDhtPartitionsStateValidator { // Validate cache sizes. result = validatePartitionsSizes(top, messages, ignoringNodes); + if (!result.isEmpty()) throw new IgniteCheckedException("Partitions cache sizes are inconsistent for " + fold(topVer, result)); } http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 39f4ed1..1b79b76 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -136,6 +136,13 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte /** */ private static final IgniteProductVersion FORCE_AFF_REASSIGNMENT_SINCE = IgniteProductVersion.fromString("2.4.3"); + /** + * This may be useful when per-entry (not per-cache based) partition policy is in use. + * See {@link IgniteSystemProperties#IGNITE_SKIP_PARTITION_SIZE_VALIDATION} for details. + * Default value is {@code false}. + */ + private static final boolean SKIP_PARTITION_SIZE_VALIDATION = Boolean.getBoolean(IgniteSystemProperties.IGNITE_SKIP_PARTITION_SIZE_VALIDATION); + /** */ @GridToStringExclude private final Object mux = new Object(); @@ -2755,13 +2762,16 @@ public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapte grpCtx.topology() : cctx.exchange().clientTopology(grpId, events().discoveryCache()); - // Do not validate read or write through caches or caches with disabled rebalance. + // Do not validate read or write through caches or caches with disabled rebalance + // or ExpiryPolicy is set or validation is disabled. if (grpCtx == null || grpCtx.config().isReadThrough() || grpCtx.config().isWriteThrough() || grpCtx.config().getCacheStoreFactory() != null || grpCtx.config().getRebalanceDelay() == -1 - || grpCtx.config().getRebalanceMode() == CacheRebalanceMode.NONE) + || grpCtx.config().getRebalanceMode() == CacheRebalanceMode.NONE + || grpCtx.config().getExpiryPolicyFactory() == null + || SKIP_PARTITION_SIZE_VALIDATION) continue; try { http://git-wip-us.apache.org/repos/asf/ignite/blob/89c77573/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java index 5feaa25..d7cc623 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java @@ -40,6 +40,7 @@ import org.apache.ignite.internal.pagemem.wal.WALIterator; import org.apache.ignite.internal.pagemem.wal.WALPointer; import org.apache.ignite.internal.pagemem.wal.record.DataEntry; import org.apache.ignite.internal.pagemem.wal.record.DataRecord; +import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageInitRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateNextSnapshotId; @@ -49,6 +50,8 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; +import org.apache.ignite.internal.processors.cache.GridCacheTtlManager; import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; @@ -56,6 +59,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartit import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteHistoricalIterator; import org.apache.ignite.internal.processors.cache.persistence.freelist.CacheFreeListImpl; +import org.apache.ignite.internal.processors.cache.persistence.migration.UpgradePendingTreeToPerPartitionTask; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx; import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId; import org.apache.ignite.internal.processors.cache.persistence.partstate.PagesAllocationRange; @@ -64,6 +68,7 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageMetaIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionCountersIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIOV2; import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList; import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseListImpl; import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler; @@ -71,10 +76,13 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointe import org.apache.ignite.internal.processors.cache.tree.CacheDataRowStore; import org.apache.ignite.internal.processors.cache.tree.CacheDataTree; import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree; +import org.apache.ignite.internal.processors.cache.tree.PendingRow; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner; import org.apache.ignite.internal.util.lang.GridCursor; +import org.apache.ignite.internal.util.lang.IgniteInClosure2X; import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; @@ -95,7 +103,14 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple private ReuseListImpl reuseList; /** {@inheritDoc} */ + @Override protected void initPendingTree(GridCacheContext cctx) throws IgniteCheckedException { + // No-op. Per-partition PendingTree should be used. + } + + /** {@inheritDoc} */ @Override protected void initDataStructures() throws IgniteCheckedException { + assert ctx.database().checkpointLockIsHeldByThread(); + Metas metas = getOrAllocateCacheMetas(); RootPage reuseListRoot = metas.reuseListRoot; @@ -122,29 +137,12 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple ((GridCacheDatabaseSharedManager)ctx.database()).addCheckpointListener(this); } - /** {@inheritDoc} */ - @Override public void onCacheStarted(GridCacheContext cctx) throws IgniteCheckedException { - if (cctx.affinityNode() && cctx.ttl().eagerTtlEnabled() && pendingEntries == null) { - ctx.database().checkpointReadLock(); - - try { - final String name = "PendingEntries"; - - RootPage pendingRootPage = indexStorage.getOrAllocateForTree(name); - - pendingEntries = new PendingEntriesTree( - grp, - name, - grp.dataRegion().pageMemory(), - pendingRootPage.pageId().pageId(), - reuseList, - pendingRootPage.isAllocated() - ); - } - finally { - ctx.database().checkpointReadUnlock(); - } - } + /** + * Get internal IndexStorage. + * See {@link UpgradePendingTreeToPerPartitionTask} for details. + */ + public IndexStorage getIndexStorage() { + return indexStorage; } /** {@inheritDoc} */ @@ -218,8 +216,8 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple int grpId = grp.groupId(); long partMetaId = pageMem.partitionMetaPageId(grpId, store.partId()); - long partMetaPage = pageMem.acquirePage(grpId, partMetaId); + long partMetaPage = pageMem.acquirePage(grpId, partMetaId); try { long partMetaPageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage); @@ -274,7 +272,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple if (needSnapshot) { pageCnt = this.ctx.pageStore().pages(grpId, store.partId()); - io.setCandidatePageCount(partMetaPageAddr, size == 0 ? 0: pageCnt); + io.setCandidatePageCount(partMetaPageAddr, size == 0 ? 0 : pageCnt); if (saveMeta) { saveMeta(ctx); @@ -285,7 +283,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple if (state == OWNING) { assert part != null; - if(!addPartition( + if (!addPartition( part, ctx.partitionStatMap(), partMetaPageAddr, @@ -295,8 +293,8 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple this.ctx.pageStore().pages(grpId, store.partId()), store.fullSize() )) - U.warn(log,"Partition was concurrently evicted grpId=" + grpId + - ", partitionId=" + part.id()); + U.warn(log, "Partition was concurrently evicted grpId=" + grpId + + ", partitionId=" + part.id()); } else if (state == MOVING || state == RENTING) { if (ctx.partitionStatMap().forceSkipIndexPartition(grpId)) { @@ -333,7 +331,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } } else if (needSnapshot) - tryAddEmptyPartitionToSnapshot(store, ctx);; + tryAddEmptyPartitionToSnapshot(store, ctx); } else if (needSnapshot) tryAddEmptyPartitionToSnapshot(store, ctx); @@ -350,8 +348,8 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple private void tryAddEmptyPartitionToSnapshot(CacheDataStore store, Context ctx) { if (getPartition(store).state() == OWNING) { ctx.partitionStatMap().put( - new GroupPartitionId(grp.groupId(), store.partId()), - new PagesAllocationRange(0, 0)); + new GroupPartitionId(grp.groupId(), store.partId()), + new PagesAllocationRange(0, 0)); } } @@ -362,7 +360,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple */ private GridDhtLocalPartition getPartition(CacheDataStore store) { return grp.topology().localPartition(store.partId(), - AffinityTopologyVersion.NONE, false, true); + AffinityTopologyVersion.NONE, false, true); } /** @@ -385,7 +383,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple long nextId = cntrsPageId; - while (true){ + while (true) { final long curId = nextId; final long curPage = pageMem.acquirePage(grpId, curId); @@ -542,19 +540,19 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple * @param currAllocatedPageCnt total number of pages allocated for partition <code>[partition, grpId]</code> */ private static boolean addPartition( - GridDhtLocalPartition part, - final PartitionAllocationMap map, - final long metaPageAddr, - final PageMetaIO io, - final int grpId, - final int partId, - final int currAllocatedPageCnt, - final long partSize + GridDhtLocalPartition part, + final PartitionAllocationMap map, + final long metaPageAddr, + final PageMetaIO io, + final int grpId, + final int partId, + final int currAllocatedPageCnt, + final long partSize ) { if (part != null) { boolean reserved = part.reserve(); - if(!reserved) + if (!reserved) return false; } else @@ -596,43 +594,6 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } } - /** - * Destroys given {@code store} and creates new with the same update counters as in given. - * - * @param store Store to destroy. - * @return New cache data store. - * @throws IgniteCheckedException If failed. - */ - public CacheDataStore recreateCacheDataStore(CacheDataStore store) throws IgniteCheckedException { - long updCounter = store.updateCounter(); - long initUpdCounter = store.initialUpdateCounter(); - - int p = store.partId(); - - PageMemoryEx pageMemory = (PageMemoryEx)grp.dataRegion().pageMemory(); - - int tag = pageMemory.invalidate(grp.groupId(), p); - - ctx.pageStore().onPartitionDestroyed(grp.groupId(), p, tag); - - CacheDataStore store0; - - partStoreLock.lock(p); - - try { - store0 = createCacheDataStore0(p); - store0.updateCounter(updCounter); - store0.updateInitialCounter(initUpdCounter); - - partDataStores.put(p, store0); - } - finally { - partStoreLock.unlock(p); - } - - return store0; - } - /** {@inheritDoc} */ @Override public void onPartitionCounterUpdated(int part, long cntr) { CacheDataStore store = partDataStores.get(part); @@ -743,7 +704,8 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple return new Metas( new RootPage(new FullPageId(metastoreRoot, grpId), allocated), - new RootPage(new FullPageId(reuseListRoot, grpId), allocated)); + new RootPage(new FullPageId(reuseListRoot, grpId), allocated), + null); } finally { pageMem.writeUnlock(grpId, metaId, metaPage, null, allocated); @@ -787,6 +749,47 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple return iterator; } + /** {@inheritDoc} */ + @Override public boolean expire( + GridCacheContext cctx, + IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c, + int amount + ) throws IgniteCheckedException { + assert !cctx.isNear() : cctx.name(); + + if (!hasPendingEntries) + return false; + + if (!busyLock.enterBusy()) + return false; + + try { + int cleared = 0; + + for (CacheDataStore store : cacheDataStores()) { + cleared += ((GridCacheDataStore)store).purgeExpired(cctx, c, amount - cleared); + + if (amount != -1 && cleared >= amount) + return true; + } + } + finally { + busyLock.leaveBusy(); + } + + return false; + } + + /** {@inheritDoc} */ + @Override public long expiredSize() throws IgniteCheckedException { + long size = 0; + + for (CacheDataStore store : cacheDataStores()) + size += ((GridCacheDataStore)store).expiredSize(); + + return size; + } + /** * Calculates free space of all partition data stores - number of bytes available for use in allocated pages. * @@ -1098,13 +1101,18 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple @GridToStringInclude private final RootPage treeRoot; + /** */ + @GridToStringInclude + private final RootPage pendingTreeRoot; + /** * @param treeRoot Metadata storage root. * @param reuseListRoot Reuse list root. */ - Metas(RootPage treeRoot, RootPage reuseListRoot) { + Metas(RootPage treeRoot, RootPage reuseListRoot, RootPage pendingTreeRoot) { this.treeRoot = treeRoot; this.reuseListRoot = reuseListRoot; + this.pendingTreeRoot = pendingTreeRoot; } /** {@inheritDoc} */ @@ -1116,7 +1124,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple /** * */ - private class GridCacheDataStore implements CacheDataStore { + public class GridCacheDataStore implements CacheDataStore { /** */ private final int partId; @@ -1127,6 +1135,9 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple private volatile CacheFreeListImpl freeList; /** */ + private PendingEntriesTree pendingTree; + + /** */ private volatile CacheDataStore delegate; /** */ @@ -1164,11 +1175,10 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple return null; } - IgniteCacheDatabaseSharedManager dbMgr = ctx.database(); - - dbMgr.checkpointReadLock(); - if (init.compareAndSet(false, true)) { + IgniteCacheDatabaseSharedManager dbMgr = ctx.database(); + + dbMgr.checkpointReadLock(); try { Metas metas = getOrAllocatePartitionMetas(); @@ -1183,6 +1193,7 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple ctx.wal(), reuseRoot.pageId().pageId(), reuseRoot.isAllocated()) { + /** {@inheritDoc} */ @Override protected long allocatePageNoReuse() throws IgniteCheckedException { assert grp.shared().database().checkpointLockIsHeldByThread(); @@ -1201,6 +1212,24 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple rowStore, treeRoot.pageId().pageId(), treeRoot.isAllocated()) { + /** {@inheritDoc} */ + @Override protected long allocatePageNoReuse() throws IgniteCheckedException { + assert grp.shared().database().checkpointLockIsHeldByThread(); + + return pageMem.allocatePage(grpId, partId, PageIdAllocator.FLAG_DATA); + } + }; + + RootPage pendingTreeRoot = metas.pendingTreeRoot; + + final PendingEntriesTree pendingTree0 = new PendingEntriesTree( + grp, + "PendingEntries-" + partId, + grp.dataRegion().pageMemory(), + pendingTreeRoot.pageId().pageId(), + reuseList, + pendingTreeRoot.isAllocated()) { + /** {@inheritDoc} */ @Override protected long allocatePageNoReuse() throws IgniteCheckedException { assert grp.shared().database().checkpointLockIsHeldByThread(); @@ -1210,7 +1239,17 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory(); - delegate0 = new CacheDataStoreImpl(partId, name, rowStore, dataTree); + delegate0 = new CacheDataStoreImpl(partId, name, rowStore, dataTree) { + /** {@inheritDoc} */ + @Override public PendingEntriesTree pendingTree() { + return pendingTree0; + } + }; + + pendingTree = pendingTree0; + + if (!hasPendingEntries && pendingTree0.size() > 0) + hasPendingEntries = true; int grpId = grp.groupId(); long partMetaId = pageMem.partitionMetaPageId(grpId, partId); @@ -1258,8 +1297,6 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple } } else { - dbMgr.checkpointReadUnlock(); - U.await(latch); delegate0 = delegate; @@ -1280,13 +1317,15 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple int grpId = grp.groupId(); long partMetaId = pageMem.partitionMetaPageId(grpId, partId); + long partMetaPage = pageMem.acquirePage(grpId, partMetaId); try { boolean allocated = false; - long pageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage); + boolean pendingTreeAllocated = false; + long pageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage); try { - long treeRoot, reuseListRoot; + long treeRoot, reuseListRoot, pendingTreeRoot; // Initialize new page. if (PageIO.getType(pageAddr) != PageIO.T_PART_META) { @@ -1296,22 +1335,18 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple treeRoot = pageMem.allocatePage(grpId, partId, PageMemory.FLAG_DATA); reuseListRoot = pageMem.allocatePage(grpId, partId, PageMemory.FLAG_DATA); + pendingTreeRoot = pageMem.allocatePage(grpId, partId, PageMemory.FLAG_DATA); assert PageIdUtils.flag(treeRoot) == PageMemory.FLAG_DATA; assert PageIdUtils.flag(reuseListRoot) == PageMemory.FLAG_DATA; + assert PageIdUtils.flag(pendingTreeRoot) == PageMemory.FLAG_DATA; io.setTreeRoot(pageAddr, treeRoot); io.setReuseListRoot(pageAddr, reuseListRoot); + io.setPendingTreeRoot(pageAddr, pendingTreeRoot); if (PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, partMetaId, partMetaPage, wal, null)) - wal.log(new MetaPageInitRecord( - grpId, - partMetaId, - io.getType(), - io.getVersion(), - treeRoot, - reuseListRoot - )); + wal.log(new PageSnapshot(new FullPageId(partMetaId, grpId), pageAddr, pageMem.pageSize())); allocated = true; } @@ -1321,6 +1356,33 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple treeRoot = io.getTreeRoot(pageAddr); reuseListRoot = io.getReuseListRoot(pageAddr); + int pageVersion = PagePartitionMetaIO.getVersion(pageAddr); + + if (pageVersion < 2) { + assert pageVersion == 1; + + if (log.isDebugEnabled()) + log.info("Upgrade partition meta page version: [part=" + partId + + ", grpId=" + grpId + ", oldVer=" + pageVersion + + ", newVer=" + io.getVersion() + ); + + io = PagePartitionMetaIO.VERSIONS.latest(); + + ((PagePartitionMetaIOV2)io).upgradePage(pageAddr); + + pendingTreeRoot = pageMem.allocatePage(grpId, partId, PageMemory.FLAG_DATA); + + io.setPendingTreeRoot(pageAddr, pendingTreeRoot); + + if (PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, partMetaId, partMetaPage, wal, null)) + wal.log(new PageSnapshot(new FullPageId(partMetaId, grpId), pageAddr, pageMem.pageSize())); + + pendingTreeAllocated = true; + } + else + pendingTreeRoot = io.getPendingTreeRoot(pageAddr); + if (PageIdUtils.flag(treeRoot) != PageMemory.FLAG_DATA) throw new StorageException("Wrong tree root page id flag: treeRoot=" + U.hexLong(treeRoot) + ", part=" + partId + ", grpId=" + grpId); @@ -1328,14 +1390,19 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple if (PageIdUtils.flag(reuseListRoot) != PageMemory.FLAG_DATA) throw new StorageException("Wrong reuse list root page id flag: reuseListRoot=" + U.hexLong(reuseListRoot) + ", part=" + partId + ", grpId=" + grpId); + + if (PageIdUtils.flag(pendingTreeRoot) != PageMemory.FLAG_DATA) + throw new StorageException("Wrong pending tree root page id flag: reuseListRoot=" + + U.hexLong(reuseListRoot) + ", part=" + partId + ", grpId=" + grpId); } return new Metas( new RootPage(new FullPageId(treeRoot, grpId), allocated), - new RootPage(new FullPageId(reuseListRoot, grpId), allocated)); + new RootPage(new FullPageId(reuseListRoot, grpId), allocated), + new RootPage(new FullPageId(pendingTreeRoot, grpId), allocated || pendingTreeAllocated)); } finally { - pageMem.writeUnlock(grpId, partMetaId, partMetaPage, null, allocated); + pageMem.writeUnlock(grpId, partMetaId, partMetaPage, null, allocated || pendingTreeAllocated); } } finally { @@ -1485,6 +1552,8 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple long expireTime, @Nullable CacheDataRow oldRow ) throws IgniteCheckedException { + assert ctx.database().checkpointLockIsHeldByThread(); + CacheDataStore delegate = init0(false); delegate.update(cctx, key, val, ver, expireTime, oldRow); @@ -1498,6 +1567,8 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple GridCacheVersion ver, long expireTime, @Nullable CacheDataRow oldRow) throws IgniteCheckedException { + assert ctx.database().checkpointLockIsHeldByThread(); + CacheDataStore delegate = init0(false); return delegate.createRow(cctx, key, val, ver, expireTime, oldRow); @@ -1506,6 +1577,8 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple /** {@inheritDoc} */ @Override public void invoke(GridCacheContext cctx, KeyCacheObject key, OffheapInvokeClosure c) throws IgniteCheckedException { + assert ctx.database().checkpointLockIsHeldByThread(); + CacheDataStore delegate = init0(false); delegate.invoke(cctx, key, c); @@ -1514,6 +1587,8 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple /** {@inheritDoc} */ @Override public void remove(GridCacheContext cctx, KeyCacheObject key, int partId) throws IgniteCheckedException { + assert ctx.database().checkpointLockIsHeldByThread(); + CacheDataStore delegate = init0(false); delegate.remove(cctx, key, partId); @@ -1583,10 +1658,142 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple /** {@inheritDoc} */ @Override public void clear(int cacheId) throws IgniteCheckedException { - CacheDataStore delegate = init0(true); + CacheDataStore delegate0 = init0(true); - if (delegate != null) - delegate.clear(cacheId); + if (delegate0 == null) + return; + + ctx.database().checkpointReadLock(); + try { + // Clear persistent pendingTree + if (pendingTree != null) { + PendingRow row = new PendingRow(cacheId); + + GridCursor<PendingRow> cursor = pendingTree.find(row, row, PendingEntriesTree.WITHOUT_KEY); + + while (cursor.next()) { + PendingRow row0 = cursor.get(); + + assert row0.link != 0 : row; + + boolean res = pendingTree.removex(row0); + + assert res; + } + } + + delegate0.clear(cacheId); + } + finally { + ctx.database().checkpointReadUnlock(); + } + } + + /** + * Gets the number of entries pending expire. + * + * @return Number of pending entries. + * @throws IgniteCheckedException If failed to get number of pending entries. + */ + public long expiredSize() throws IgniteCheckedException { + CacheDataStore delegate0 = init0(true); + + return delegate0 == null ? 0 : pendingTree.size(); + } + + /** + * Removes expired entries from data store. + * + * @param cctx Cache context. + * @param c Expiry closure that should be applied to expired entry. See {@link GridCacheTtlManager} for details. + * @param amount Limit of processed entries by single call, {@code -1} for no limit. + * @return {@code True} if unprocessed expired entries remains. + * @throws IgniteCheckedException If failed. + */ + public int purgeExpired(GridCacheContext cctx, + IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c, + int amount) throws IgniteCheckedException { + CacheDataStore delegate0 = init0(true); + + if (delegate0 == null || pendingTree == null) + return 0; + + GridDhtLocalPartition part = cctx.topology().localPartition(partId, AffinityTopologyVersion.NONE, false, false); + + // Skip non-owned partitions. + if (part == null || part.state() != OWNING || pendingTree.size() == 0) + return 0; + + cctx.shared().database().checkpointReadLock(); + try { + if (!part.reserve()) + return 0; + + try { + if (part.state() != OWNING) + return 0; + + long now = U.currentTimeMillis(); + + GridCursor<PendingRow> cur; + + if (grp.sharedGroup()) + cur = pendingTree.find(new PendingRow(cctx.cacheId()), new PendingRow(cctx.cacheId(), now, 0)); + else + cur = pendingTree.find(null, new PendingRow(CU.UNDEFINED_CACHE_ID, now, 0)); + + if (!cur.next()) + return 0; + + GridCacheVersion obsoleteVer = null; + + int cleared = 0; + + do { + PendingRow row = cur.get(); + + if (amount != -1 && cleared > amount) + return cleared; + + assert row.key != null && row.link != 0 && row.expireTime != 0 : row; + + row.key.partition(partId); + + if (pendingTree.removex(row)) { + if (obsoleteVer == null) + obsoleteVer = ctx.versions().next(); + + GridCacheEntryEx e1 = cctx.cache().entryEx(row.key); + + if (e1 != null) + c.apply(e1, obsoleteVer); + } + + cleared++; + } + while (cur.next()); + + return cleared; + } + finally { + part.release(); + } + } + finally { + cctx.shared().database().checkpointReadUnlock(); + } + } + + /** {@inheritDoc} */ + @Override public PendingEntriesTree pendingTree() { + try { + CacheDataStore delegate0 = init0(true); + + return delegate0 == null ? null : pendingTree; + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } } }