http://git-wip-us.apache.org/repos/asf/ignite/blob/cd4d0400/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/IgniteWalRecoverySeveralRestartsTest.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/IgniteWalRecoverySeveralRestartsTest.java b/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/IgniteWalRecoverySeveralRestartsTest.java deleted file mode 100644 index 9b0231d..0000000 --- a/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/IgniteWalRecoverySeveralRestartsTest.java +++ /dev/null @@ -1,350 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.database.db.file; - -import java.util.Random; -import java.util.concurrent.ThreadLocalRandom; -import org.apache.ignite.IgniteCache; -import org.apache.ignite.IgniteDataStreamer; -import org.apache.ignite.cache.CacheAtomicityMode; -import org.apache.ignite.cache.CacheRebalanceMode; -import org.apache.ignite.cache.CacheWriteSynchronizationMode; -import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; -import org.apache.ignite.cache.query.annotations.QuerySqlField; -import org.apache.ignite.configuration.BinaryConfiguration; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.configuration.MemoryConfiguration; -import org.apache.ignite.configuration.MemoryPolicyConfiguration; -import org.apache.ignite.configuration.PersistentStoreConfiguration; -import org.apache.ignite.internal.IgniteEx; -import org.apache.ignite.internal.processors.cache.database.wal.FileWriteAheadLogManager; -import org.apache.ignite.internal.util.typedef.internal.S; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.testframework.GridTestUtils; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; - -/** - * - */ -public class IgniteWalRecoverySeveralRestartsTest extends GridCommonAbstractTest { - /** */ - public static final int PAGE_SIZE = 1024; - - /** */ - private static final int KEYS_COUNT = 100_000; - - /** */ - private static final int LARGE_KEYS_COUNT = 5_000; - - /** */ - private static final Random rnd = new Random(System.currentTimeMillis()); - - /** */ - private String cacheName = "test"; - - /** {@inheritDoc} */ - @Override protected long getTestTimeout() { - return 3600_000; - } - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - CacheConfiguration<Integer, IndexedObject> ccfg = new CacheConfiguration<>(cacheName); - - ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); - ccfg.setRebalanceMode(CacheRebalanceMode.NONE); - ccfg.setIndexedTypes(Integer.class, IndexedObject.class); - ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - ccfg.setAffinity(new RendezvousAffinityFunction(false, 64 * 4)); // 64 per node - ccfg.setReadFromBackup(true); - - cfg.setCacheConfiguration(ccfg); - - MemoryConfiguration dbCfg = new MemoryConfiguration(); - - dbCfg.setPageSize(PAGE_SIZE); - - MemoryPolicyConfiguration memPlcCfg = new MemoryPolicyConfiguration(); - - memPlcCfg.setName("dfltMemPlc"); - memPlcCfg.setInitialSize(500 * 1024 * 1024); - memPlcCfg.setMaxSize(500 * 1024 * 1024); - - dbCfg.setMemoryPolicies(memPlcCfg); - dbCfg.setDefaultMemoryPolicyName("dfltMemPlc"); - - cfg.setMemoryConfiguration(dbCfg); - - PersistentStoreConfiguration pCfg = new PersistentStoreConfiguration(); - - cfg.setPersistentStoreConfiguration(pCfg); - - cfg.setMarshaller(null); - - BinaryConfiguration binCfg = new BinaryConfiguration(); - - binCfg.setCompactFooter(false); - - cfg.setBinaryConfiguration(binCfg); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false)); - - System.setProperty(FileWriteAheadLogManager.IGNITE_PDS_WAL_MODE, "LOG_ONLY"); - - super.beforeTest(); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - super.afterTest(); - - System.clearProperty(FileWriteAheadLogManager.IGNITE_PDS_WAL_MODE); - - deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false)); - } - - /** - * @throws Exception if failed. - */ - public void testWalRecoverySeveralRestarts() throws Exception { - try { - IgniteEx ignite = startGrid(1); - - Random locRandom = ThreadLocalRandom.current(); - - try (IgniteDataStreamer<Integer, IndexedObject> dataLdr = ignite.dataStreamer(cacheName)) { - for (int i = 0; i < KEYS_COUNT; ++i) { - if (i % (KEYS_COUNT / 100) == 0) - info("Loading " + i * 100 / KEYS_COUNT + "%"); - - dataLdr.addData(i, new IndexedObject(i)); - } - } - - int size = ignite.cache(cacheName).size(); - - for (int restartCnt = 0; restartCnt < 5; ++restartCnt) { - stopGrid(1, true); - - info("Restart #" + restartCnt); - U.sleep(500); - - ignite = startGrid(1); - - IgniteCache<Integer, IndexedObject> cache = ignite.cache(cacheName); - - assertEquals(size, cache.size()); - - info("Restart #" + restartCnt); - - for (int i = 0; i < KEYS_COUNT / 100; ++i) { - assertNotNull(cache.get(locRandom.nextInt(KEYS_COUNT / 100))); - - cache.put(locRandom.nextInt(KEYS_COUNT / 100), new IndexedObject(locRandom.nextInt(KEYS_COUNT / 100))); - } - - cache.put(KEYS_COUNT + restartCnt, new IndexedObject(KEYS_COUNT + restartCnt)); - - // Check recovery for partition meta pages. - size = cache.size(); - } - } - finally { - stopAllGrids(); - } - } - - /** - * @throws Exception if failed. - */ - public void testWalRecoveryWithDynamicCache() throws Exception { - try { - IgniteEx ignite = startGrid(1); - - CacheConfiguration<Integer, IndexedObject> dynCacheCfg = new CacheConfiguration<>(); - - dynCacheCfg.setName("dyncache"); - dynCacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); - dynCacheCfg.setRebalanceMode(CacheRebalanceMode.NONE); - dynCacheCfg.setIndexedTypes(Integer.class, IndexedObject.class); - dynCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - dynCacheCfg.setAffinity(new RendezvousAffinityFunction(false, 64 * 4)); // 64 per node - dynCacheCfg.setReadFromBackup(true); - - ignite.getOrCreateCache(dynCacheCfg); - - try (IgniteDataStreamer<Integer, IndexedObject> dataLdr = ignite.dataStreamer("dyncache")) { - for (int i = 0; i < KEYS_COUNT; ++i) { - if (i % (KEYS_COUNT / 100) == 0) - info("Loading " + i * 100 / KEYS_COUNT + "%"); - - dataLdr.addData(i, new IndexedObject(i)); - } - } - - for (int restartCnt = 0; restartCnt < 5; ++restartCnt) { - stopGrid(1, true); - - info("Restart #" + restartCnt); - U.sleep(500); - - ignite = startGrid(1); - - ThreadLocalRandom locRandom = ThreadLocalRandom.current(); - - IgniteCache<Integer, IndexedObject> cache = ignite.getOrCreateCache(dynCacheCfg); - - for (int i = 0; i < KEYS_COUNT; ++i) - assertNotNull(cache.get(locRandom.nextInt(KEYS_COUNT))); - } - } - finally { - stopAllGrids(); - } - } - - /** - * @throws Exception if failed. - */ - public void testWalRecoveryWithDynamicCacheLargeObjects() throws Exception { - try { - IgniteEx ignite = startGrid(1); - - CacheConfiguration<Integer, IndexedObject> dynCacheCfg = new CacheConfiguration<>(); - - dynCacheCfg.setName("dyncache"); - dynCacheCfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); - dynCacheCfg.setRebalanceMode(CacheRebalanceMode.NONE); - dynCacheCfg.setIndexedTypes(Integer.class, IndexedObject.class); - dynCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - dynCacheCfg.setAffinity(new RendezvousAffinityFunction(false, 64 * 4)); // 64 per node - dynCacheCfg.setReadFromBackup(true); - - ignite.getOrCreateCache(dynCacheCfg); - - try (IgniteDataStreamer<Integer, IndexedObject> dataLdr = ignite.dataStreamer("dyncache")) { - for (int i = 0; i < LARGE_KEYS_COUNT; ++i) { - if (i % (LARGE_KEYS_COUNT / 100) == 0) - info("Loading " + i * 100 / LARGE_KEYS_COUNT + "%"); - - IndexedObject obj = new IndexedObject(i); - - obj.payload = new byte[PAGE_SIZE + 2]; - - dataLdr.addData(i, obj); - } - } - - for (int restartCnt = 0; restartCnt < 5; ++restartCnt) { - stopGrid(1, true); - - info("Restart #" + restartCnt); - U.sleep(500); - - ignite = startGrid(1); - - ThreadLocalRandom locRandom = ThreadLocalRandom.current(); - - IgniteCache<Integer, IndexedObject> cache = ignite.getOrCreateCache(dynCacheCfg); - - for (int i = 0; i < LARGE_KEYS_COUNT; ++i) { - IndexedObject val = cache.get(locRandom.nextInt(LARGE_KEYS_COUNT)); - - assertNotNull(val); - - assertEquals(PAGE_SIZE + 2, val.payload.length); - } - } - } - finally { - stopAllGrids(); - } - } - - /** - * - */ - private static class IndexedObject { - /** */ - @QuerySqlField(index = true) - private int iVal; - - /** */ - @QuerySqlField(index = true) - private String strVal0; - - /** */ - @QuerySqlField(index = true) - private String strVal1; - - /** */ - private byte[] payload; - - /** - * @param iVal Integer value. - */ - private IndexedObject(int iVal) { - this.iVal = iVal; - - strVal0 = "String value #0 " + iVal + " " + GridTestUtils.randomString(rnd, 256); - strVal1 = GridTestUtils.randomString(rnd, 256); - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - - IndexedObject obj = (IndexedObject)o; - - if (iVal != obj.iVal) - return false; - - if (strVal0 != null ? !strVal0.equals(obj.strVal0) : obj.strVal0 != null) - return false; - - return strVal1 != null ? strVal1.equals(obj.strVal1) : obj.strVal1 == null; - - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - int res = iVal; - - res = 31 * res + (strVal0 != null ? strVal0.hashCode() : 0); - res = 31 * res + (strVal1 != null ? strVal1.hashCode() : 0); - - return res; - } - - /** {@inheritDoc} */ - @Override public String toString() { - return S.toString(IndexedObject.class, this); - } - } -}
http://git-wip-us.apache.org/repos/asf/ignite/blob/cd4d0400/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/PageStoreCheckpointSimulationSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/PageStoreCheckpointSimulationSelfTest.java b/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/PageStoreCheckpointSimulationSelfTest.java deleted file mode 100644 index 0addcb3..0000000 --- a/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/PageStoreCheckpointSimulationSelfTest.java +++ /dev/null @@ -1,1002 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.database.db.file; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.nio.ByteOrder; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.cache.CacheRebalanceMode; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.MemoryConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.configuration.PersistentStoreConfiguration; -import org.apache.ignite.internal.IgniteEx; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.pagemem.FullPageId; -import org.apache.ignite.internal.pagemem.PageIdAllocator; -import org.apache.ignite.internal.pagemem.PageMemory; -import org.apache.ignite.internal.pagemem.PageUtils; -import org.apache.ignite.internal.processors.cache.database.GridCacheDatabaseSharedManager; -import org.apache.ignite.internal.processors.cache.database.pagemem.PageMemoryEx; -import org.apache.ignite.internal.processors.cache.database.pagemem.PageMemoryImpl; -import org.apache.ignite.internal.processors.cache.database.tree.io.DataPageIO; -import org.apache.ignite.internal.processors.cache.database.tree.io.PageIO; -import org.apache.ignite.internal.processors.cache.database.tree.io.TrackingPageIO; -import org.apache.ignite.internal.processors.cache.database.wal.FileWriteAheadLogManager; -import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; -import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; -import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager; -import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; -import org.apache.ignite.internal.pagemem.wal.WALIterator; -import org.apache.ignite.internal.pagemem.wal.WALPointer; -import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord; -import org.apache.ignite.internal.pagemem.wal.record.DataEntry; -import org.apache.ignite.internal.pagemem.wal.record.DataRecord; -import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot; -import org.apache.ignite.internal.pagemem.wal.record.WALRecord; -import org.apache.ignite.internal.processors.cache.CacheObject; -import org.apache.ignite.internal.processors.cache.CacheObjectContext; -import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheOperation; -import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.testframework.GridTestUtils; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; - -/** - * - */ -public class PageStoreCheckpointSimulationSelfTest extends GridCommonAbstractTest { - /** */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** */ - private static final int TOTAL_PAGES = 1000; - - /** */ - private static final boolean VERBOSE = false; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - CacheConfiguration ccfg = new CacheConfiguration("partitioned"); - - ccfg.setRebalanceMode(CacheRebalanceMode.NONE); - - cfg.setCacheConfiguration(ccfg); - - MemoryConfiguration dbCfg = new MemoryConfiguration(); - - cfg.setMemoryConfiguration(dbCfg); - - PersistentStoreConfiguration pCfg = new PersistentStoreConfiguration(); - - pCfg.setCheckpointingFrequency(500); - - cfg.setPersistentStoreConfiguration(pCfg); - - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - - discoSpi.setIpFinder(IP_FINDER); - - cfg.setDiscoverySpi(discoSpi); - - return cfg; - } - - /** {@inheritDoc} */ - @Override protected void beforeTest() throws Exception { - stopAllGrids(); - - deleteWorkFiles(); - - System.setProperty(FileWriteAheadLogManager.IGNITE_PDS_WAL_MODE, "LOG_ONLY"); - - System.setProperty(FileWriteAheadLogManager.IGNITE_PDS_WAL_ALWAYS_WRITE_FULL_PAGES, "true"); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - System.clearProperty(FileWriteAheadLogManager.IGNITE_PDS_WAL_MODE); - - System.clearProperty(FileWriteAheadLogManager.IGNITE_PDS_WAL_ALWAYS_WRITE_FULL_PAGES); - - stopAllGrids(); - - deleteWorkFiles(); - } - - /** - * @throws Exception if failed. - */ - public void testCheckpointSimulationMultiThreaded() throws Exception { - IgniteEx ig = startGrid(0); - - GridCacheSharedContext<Object, Object> shared = ig.context().cache().context(); - - GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)shared.database(); - - IgnitePageStoreManager pageStore = shared.pageStore(); - - U.sleep(1_000); - - // Disable integrated checkpoint thread. - dbMgr.enableCheckpoints(false).get(); - - // Must put something in partition 0 in order to initialize meta page. - // Otherwise we will violate page store integrity rules. - ig.cache("partitioned").put(0, 0); - - PageMemory mem = shared.database().memoryPolicy(null).pageMemory(); - - IgniteBiTuple<Map<FullPageId, Integer>, WALPointer> res; - - try { - res = runCheckpointing(ig, (PageMemoryImpl)mem, pageStore, shared.wal(), - shared.cache().cache("partitioned").context().cacheId()); - } - catch (Throwable th) { - log().error("Error while running checkpointing", th); - - throw th; - } - finally { - dbMgr.enableCheckpoints(true).get(); - - stopAllGrids(false); - } - - ig = startGrid(0); - - shared = ig.context().cache().context(); - - dbMgr = (GridCacheDatabaseSharedManager)shared.database(); - - dbMgr.enableCheckpoints(false).get(); - - mem = shared.database().memoryPolicy(null).pageMemory(); - - verifyReads(res.get1(), mem, res.get2(), shared.wal()); - } - - /** - * @throws Exception if failed. - */ - public void testGetForInitialWrite() throws Exception { - IgniteEx ig = startGrid(0); - - GridCacheSharedContext<Object, Object> shared = ig.context().cache().context(); - - int cacheId = shared.cache().cache("partitioned").context().cacheId(); - - GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)shared.database(); - - // Disable integrated checkpoint thread. - dbMgr.enableCheckpoints(false); - - PageMemory mem = shared.database().memoryPolicy(null).pageMemory(); - - IgniteWriteAheadLogManager wal = shared.wal(); - - WALPointer start = wal.log(new CheckpointRecord(null, false)); - - final FullPageId[] initWrites = new FullPageId[10]; - - ig.context().cache().context().database().checkpointReadLock(); - - try { - for (int i = 0; i < initWrites.length; i++) - initWrites[i] = new FullPageId(mem.allocatePage(cacheId, 0, PageIdAllocator.FLAG_DATA), cacheId); - - // Check getForInitialWrite methods. - for (FullPageId fullId : initWrites) { - long page = mem.acquirePage(fullId.cacheId(), fullId.pageId()); - try { - long pageAddr = mem.writeLock(fullId.cacheId(), fullId.pageId(), page); - - try { - DataPageIO.VERSIONS.latest().initNewPage(pageAddr, fullId.pageId(), mem.pageSize()); - - for (int i = PageIO.COMMON_HEADER_END; i < mem.pageSize(); i++) - PageUtils.putByte(pageAddr, i, (byte)0xAB); - } - finally { - mem.writeUnlock(fullId.cacheId(), fullId.pageId(), page, null, true); - } - } - finally { - mem.releasePage(fullId.cacheId(), fullId.pageId(), page); - } - } - - wal.fsync(null); - } - finally { - ig.context().cache().context().database().checkpointReadUnlock(); - stopAllGrids(false); - } - - ig = startGrid(0); - - shared = ig.context().cache().context(); - - dbMgr = (GridCacheDatabaseSharedManager)shared.database(); - - dbMgr.enableCheckpoints(false); - - wal = shared.wal(); - - try (WALIterator it = wal.replay(start)) { - it.nextX(); - - for (FullPageId initialWrite : initWrites) { - IgniteBiTuple<WALPointer, WALRecord> tup = it.nextX(); - - assertTrue(String.valueOf(tup.get2()), tup.get2() instanceof PageSnapshot); - - PageSnapshot snap = (PageSnapshot)tup.get2(); - - FullPageId actual = snap.fullPageId(); - - //there are extra tracking pages, skip them - if (TrackingPageIO.VERSIONS.latest().trackingPageFor(actual.pageId(), mem.pageSize()) == actual.pageId()) { - tup = it.nextX(); - - assertTrue(tup.get2() instanceof PageSnapshot); - - actual = ((PageSnapshot)tup.get2()).fullPageId(); - } - - assertEquals(initialWrite, actual); - } - } - } - - /** - * @throws Exception if failed. - */ - public void testDataWalEntries() throws Exception { - IgniteEx ig = startGrid(0); - - GridCacheSharedContext<Object, Object> sharedCtx = ig.context().cache().context(); - GridCacheContext<Object, Object> cctx = sharedCtx.cache().cache("partitioned").context(); - - GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)sharedCtx.database(); - IgniteWriteAheadLogManager wal = sharedCtx.wal(); - - assertTrue(wal.isAlwaysWriteFullPages()); - - db.enableCheckpoints(false); - - final int cnt = 10; - - List<DataEntry> entries = new ArrayList<>(cnt); - - for (int i = 0; i < cnt; i++) { - GridCacheOperation op = i % 2 == 0 ? GridCacheOperation.UPDATE : GridCacheOperation.DELETE; - - KeyCacheObject key = cctx.toCacheKeyObject(i); - - CacheObject val = null; - - if (op != GridCacheOperation.DELETE) - val = cctx.toCacheObject("value-" + i); - - entries.add(new DataEntry(cctx.cacheId(), key, val, op, null, cctx.versions().next(), 0L, - cctx.affinity().partition(i), i)); - } - - UUID cpId = UUID.randomUUID(); - - WALPointer start = wal.log(new CheckpointRecord(cpId, null, false)); - - wal.fsync(start); - - for (DataEntry entry : entries) - wal.log(new DataRecord(entry)); - - WALPointer end = wal.log(new CheckpointRecord(cpId, start, true)); - - wal.fsync(end); - - // Data will not be written to the page store. - stopAllGrids(); - - ig = startGrid(0); - - sharedCtx = ig.context().cache().context(); - cctx = sharedCtx.cache().cache("partitioned").context(); - - db = (GridCacheDatabaseSharedManager)sharedCtx.database(); - wal = sharedCtx.wal(); - - db.enableCheckpoints(false); - - try (WALIterator it = wal.replay(start)) { - IgniteBiTuple<WALPointer, WALRecord> tup = it.nextX(); - - assert tup.get2() instanceof CheckpointRecord; - - assertEquals(start, tup.get1()); - - CheckpointRecord cpRec = (CheckpointRecord)tup.get2(); - - assertEquals(cpId, cpRec.checkpointId()); - assertNull(cpRec.checkpointMark()); - assertFalse(cpRec.end()); - - int idx = 0; - CacheObjectContext coctx = cctx.cacheObjectContext(); - - while (idx < entries.size()) { - tup = it.nextX(); - - assert tup.get2() instanceof DataRecord; - - DataRecord dataRec = (DataRecord)tup.get2(); - - DataEntry entry = entries.get(idx); - - assertEquals(1, dataRec.writeEntries().size()); - - DataEntry readEntry = dataRec.writeEntries().get(0); - - assertEquals(entry.cacheId(), readEntry.cacheId()); - assertEquals(entry.key().<Integer>value(coctx, true), readEntry.key().<Integer>value(coctx, true)); - assertEquals(entry.op(), readEntry.op()); - - if (entry.op() == GridCacheOperation.UPDATE) - assertEquals(entry.value().value(coctx, true), readEntry.value().value(coctx, true)); - else - assertNull(entry.value()); - - assertEquals(entry.writeVersion(), readEntry.writeVersion()); - assertEquals(entry.nearXidVersion(), readEntry.nearXidVersion()); - assertEquals(entry.partitionCounter(), readEntry.partitionCounter()); - - idx++; - } - - tup = it.nextX(); - - assert tup.get2() instanceof CheckpointRecord; - - assertEquals(end, tup.get1()); - - cpRec = (CheckpointRecord)tup.get2(); - - assertEquals(cpId, cpRec.checkpointId()); - assertEquals(start, cpRec.checkpointMark()); - assertTrue(cpRec.end()); - } - } - - /** - * @throws Exception if failed. - */ - public void testPageWalEntries() throws Exception { - IgniteEx ig = startGrid(0); - - GridCacheSharedContext<Object, Object> sharedCtx = ig.context().cache().context(); - int cacheId = sharedCtx.cache().cache("partitioned").context().cacheId(); - - GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)sharedCtx.database(); - PageMemory pageMem = sharedCtx.database().memoryPolicy(null).pageMemory(); - IgniteWriteAheadLogManager wal = sharedCtx.wal(); - - db.enableCheckpoints(false).get(); - - int pageCnt = 100; - - List<FullPageId> pageIds = new ArrayList<>(); - - for (int i = 0; i < pageCnt; i++) { - db.checkpointReadLock(); - try { - pageIds.add(new FullPageId(pageMem.allocatePage(cacheId, PageIdAllocator.INDEX_PARTITION, - PageIdAllocator.FLAG_IDX), cacheId)); - } - finally { - db.checkpointReadUnlock(); - } - } - - UUID cpId = UUID.randomUUID(); - - WALPointer start = wal.log(new CheckpointRecord(cpId, null, false)); - - wal.fsync(start); - - ig.context().cache().context().database().checkpointReadLock(); - - try { - for (FullPageId pageId : pageIds) - writePageData(pageId, pageMem); - } - finally { - ig.context().cache().context().database().checkpointReadUnlock(); - } - - WALPointer end = wal.log(new CheckpointRecord(cpId, start, true)); - - wal.fsync(end); - - // Data will not be written to the page store. - stopAllGrids(); - - ig = startGrid(0); - - sharedCtx = ig.context().cache().context(); - - db = (GridCacheDatabaseSharedManager)sharedCtx.database(); - wal = sharedCtx.wal(); - - db.enableCheckpoints(false); - - try (WALIterator it = wal.replay(start)) { - IgniteBiTuple<WALPointer, WALRecord> tup = it.nextX(); - - assert tup.get2() instanceof CheckpointRecord : tup.get2(); - - assertEquals(start, tup.get1()); - - CheckpointRecord cpRec = (CheckpointRecord)tup.get2(); - - assertEquals(cpId, cpRec.checkpointId()); - assertNull(cpRec.checkpointMark()); - assertFalse(cpRec.end()); - - int idx = 0; - - while (idx < pageIds.size()) { - tup = it.nextX(); - - assert tup.get2() instanceof PageSnapshot : tup.get2().getClass(); - - PageSnapshot snap = (PageSnapshot)tup.get2(); - - //there are extra tracking pages, skip them - if (TrackingPageIO.VERSIONS.latest().trackingPageFor(snap.fullPageId().pageId(), pageMem.pageSize()) == snap.fullPageId().pageId()) { - tup = it.nextX(); - - assertTrue(tup.get2() instanceof PageSnapshot); - - snap = (PageSnapshot)tup.get2(); - } - - assertEquals(pageIds.get(idx), snap.fullPageId()); - - idx++; - } - - tup = it.nextX(); - - assert tup.get2() instanceof CheckpointRecord; - - assertEquals(end, tup.get1()); - - cpRec = (CheckpointRecord)tup.get2(); - - assertEquals(cpId, cpRec.checkpointId()); - assertEquals(start, cpRec.checkpointMark()); - assertTrue(cpRec.end()); - } - } - - /** - * @throws Exception if failed. - */ - public void testDirtyFlag() throws Exception { - IgniteEx ig = startGrid(0); - - GridCacheSharedContext<Object, Object> shared = ig.context().cache().context(); - - int cacheId = shared.cache().cache("partitioned").context().cacheId(); - - GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)shared.database(); - - // Disable integrated checkpoint thread. - dbMgr.enableCheckpoints(false); - - PageMemoryEx mem = (PageMemoryEx) dbMgr.memoryPolicy(null).pageMemory(); - - ig.context().cache().context().database().checkpointReadLock(); - - FullPageId[] pageIds = new FullPageId[100]; - - try { - for (int i = 0; i < pageIds.length; i++) - pageIds[i] = new FullPageId(mem.allocatePage(cacheId, 0, PageIdAllocator.FLAG_DATA), cacheId); - - for (FullPageId fullId : pageIds) { - long page = mem.acquirePage(fullId.cacheId(), fullId.pageId()); - - try { - assertTrue(mem.isDirty(fullId.cacheId(), fullId.pageId(), page)); //page is dirty right after allocation - - long pageAddr = mem.writeLock(fullId.cacheId(), fullId.pageId(), page); - - PageIO.setPageId(pageAddr, fullId.pageId()); - - try { - assertTrue(mem.isDirty(fullId.cacheId(), fullId.pageId(), page)); - } - finally { - mem.writeUnlock(fullId.cacheId(), fullId.pageId(),page, null,true); - } - - assertTrue(mem.isDirty(fullId.cacheId(), fullId.pageId(), page)); - } - finally { - mem.releasePage(fullId.cacheId(), fullId.pageId(), page); - } - } - } - finally { - ig.context().cache().context().database().checkpointReadUnlock(); - } - - Collection<FullPageId> cpPages = mem.beginCheckpoint(); - - ig.context().cache().context().database().checkpointReadLock(); - - try { - for (FullPageId fullId : pageIds) { - assertTrue(cpPages.contains(fullId)); - - ByteBuffer buf = ByteBuffer.allocate(mem.pageSize()); - - long page = mem.acquirePage(fullId.cacheId(), fullId.pageId()); - - try { - assertTrue(mem.isDirty(fullId.cacheId(), fullId.pageId(), page)); - - long pageAddr = mem.writeLock(fullId.cacheId(), fullId.pageId(), page); - - try { - assertFalse(mem.isDirty(fullId.cacheId(), fullId.pageId(), page)); - - for (int i = PageIO.COMMON_HEADER_END; i < mem.pageSize(); i++) - PageUtils.putByte(pageAddr, i, (byte)1); - } - finally { - mem.writeUnlock(fullId.cacheId(), fullId.pageId(), page, null, true); - } - - assertTrue(mem.isDirty(fullId.cacheId(), fullId.pageId(), page)); - - buf.rewind(); - - mem.getForCheckpoint(fullId, buf, null); - - buf.position(PageIO.COMMON_HEADER_END); - - while (buf.hasRemaining()) - assertEquals((byte)0, buf.get()); - } - finally { - mem.releasePage(fullId.cacheId(), fullId.pageId(), page); - } - } - } - finally { - ig.context().cache().context().database().checkpointReadUnlock(); - } - - mem.finishCheckpoint(); - - for (FullPageId fullId : pageIds) { - long page = mem.acquirePage(fullId.cacheId(), fullId.pageId()); - try { - assertTrue(mem.isDirty(fullId.cacheId(), fullId.pageId(), page)); - } - finally { - mem.releasePage(fullId.cacheId(), fullId.pageId(), page); - } - } - } - - /** - * @throws Exception if failed. - */ - private void writePageData(FullPageId fullId, PageMemory mem) throws Exception { - long page = mem.acquirePage(fullId.cacheId(), fullId.pageId()); - try { - long pageAddr = mem.writeLock(fullId.cacheId(), fullId.pageId(), page); - - try { - DataPageIO.VERSIONS.latest().initNewPage(pageAddr, fullId.pageId(), mem.pageSize()); - - ThreadLocalRandom rnd = ThreadLocalRandom.current(); - - for (int i = PageIO.COMMON_HEADER_END; i < mem.pageSize(); i++) - PageUtils.putByte(pageAddr, i, (byte)rnd.nextInt(255)); - } - finally { - mem.writeUnlock(fullId.cacheId(), fullId.pageId(), page, null, true); - } - } - finally { - mem.releasePage(fullId.cacheId(), fullId.pageId(), page); - } - } - - /** - * @param res Result map to verify. - * @param mem Memory. - */ - private void verifyReads( - Map<FullPageId, Integer> res, - PageMemory mem, - WALPointer start, - IgniteWriteAheadLogManager wal - ) throws Exception { - Map<FullPageId, byte[]> replay = new HashMap<>(); - - try (WALIterator it = wal.replay(start)) { - IgniteBiTuple<WALPointer, WALRecord> tup = it.nextX(); - - assertTrue("Invalid record: " + tup, tup.get2() instanceof CheckpointRecord); - - CheckpointRecord cpRec = (CheckpointRecord)tup.get2(); - - while (it.hasNextX()) { - tup = it.nextX(); - - WALRecord rec = tup.get2(); - - if (rec instanceof CheckpointRecord) { - CheckpointRecord end = (CheckpointRecord)rec; - - // Found the finish mark. - if (end.checkpointId().equals(cpRec.checkpointId()) && end.end()) - break; - } - else if (rec instanceof PageSnapshot) { - PageSnapshot page = (PageSnapshot)rec; - - replay.put(page.fullPageId(), page.pageData()); - } - } - } - - // Check read-through from the file store. - for (Map.Entry<FullPageId, Integer> entry : res.entrySet()) { - FullPageId fullId = entry.getKey(); - int state = entry.getValue(); - - if (state == -1) { - info("Page was never written: " + fullId); - - continue; - } - - byte[] walData = replay.get(fullId); - - assertNotNull("Missing WAL record for a written page: " + fullId, walData); - - long page = mem.acquirePage(fullId.cacheId(), fullId.pageId()); - try { - long pageAddr = mem.readLock(fullId.cacheId(), fullId.pageId(), page); - - try { - for (int i = PageIO.COMMON_HEADER_END; i < mem.pageSize(); i++) { - assertEquals("Invalid state [pageId=" + fullId + ", pos=" + i + ']', - state & 0xFF, PageUtils.getByte(pageAddr, i) & 0xFF); - - assertEquals("Invalid WAL state [pageId=" + fullId + ", pos=" + i + ']', - state & 0xFF, walData[i] & 0xFF); - } - } - finally { - mem.readUnlock(fullId.cacheId(), fullId.pageId(), page); - } - } - finally { - mem.releasePage(fullId.cacheId(), fullId.pageId(), page); - } - } - } - - /** - * @param mem Memory to use. - * @param storeMgr Store manager. - * @param cacheId Cache ID. - * @return Result map of random operations. - * @throws Exception If failure occurred. - */ - private IgniteBiTuple<Map<FullPageId, Integer>, WALPointer> runCheckpointing( - final IgniteEx ig, - final PageMemoryImpl mem, - final IgnitePageStoreManager storeMgr, - final IgniteWriteAheadLogManager wal, - final int cacheId - ) throws Exception { - final ConcurrentMap<FullPageId, Integer> resMap = new ConcurrentHashMap<>(); - - final FullPageId pages[] = new FullPageId[TOTAL_PAGES]; - Set<FullPageId> allocated = new HashSet<>(); - - for (int i = 0; i < TOTAL_PAGES; i++) { - FullPageId fullId = new FullPageId(mem.allocatePage(cacheId, 0, PageIdAllocator.FLAG_DATA), cacheId); - - resMap.put(fullId, -1); - - pages[i] = fullId; - allocated.add(fullId); - } - - final AtomicBoolean run = new AtomicBoolean(true); - - // Simulate transaction lock. - final ReadWriteLock updLock = new ReentrantReadWriteLock(); - - // Mark the start position. - CheckpointRecord cpRec = new CheckpointRecord(null, false); - - WALPointer start = wal.log(cpRec); - - wal.fsync(start); - - IgniteInternalFuture<Long> updFut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - while (true) { - FullPageId fullId = pages[ThreadLocalRandom.current().nextInt(TOTAL_PAGES)]; - - updLock.readLock().lock(); - - try { - if (!run.get()) - return null; - - ig.context().cache().context().database().checkpointReadLock(); - - try { - long page = mem.acquirePage(fullId.cacheId(), fullId.pageId()); - - try { - long pageAddr = mem.writeLock(fullId.cacheId(), fullId.pageId(), page); - - PageIO.setPageId(pageAddr, fullId.pageId()); - - try { - int state = resMap.get(fullId); - - if (state != -1) { - if (VERBOSE) - info("Verify page [fullId=" + fullId + ", state=" + state + - ", buf=" + pageAddr + - ", bhc=" + U.hexLong(System.identityHashCode(pageAddr)) + - ", page=" + U.hexLong(System.identityHashCode(page)) + ']'); - - for (int i = PageIO.COMMON_HEADER_END; i < mem.pageSize(); i++) - assertEquals("Verify page failed [fullId=" + fullId + - ", i=" + i + - ", state=" + state + - ", buf=" + pageAddr + - ", bhc=" + U.hexLong(System.identityHashCode(pageAddr)) + - ", page=" + U.hexLong(System.identityHashCode(page)) + ']', - state & 0xFF, PageUtils.getByte(pageAddr, i) & 0xFF); - } - - state = (state + 1) & 0xFF; - - if (VERBOSE) - info("Write page [fullId=" + fullId + ", state=" + state + - ", buf=" + pageAddr + - ", bhc=" + U.hexLong(System.identityHashCode(pageAddr)) + - ", page=" + U.hexLong(System.identityHashCode(page)) + ']'); - - for (int i = PageIO.COMMON_HEADER_END; i < mem.pageSize(); i++) - PageUtils.putByte(pageAddr, i, (byte)state); - - resMap.put(fullId, state); - } - finally { - mem.writeUnlock(fullId.cacheId(), fullId.pageId(),page, null,true); - } - } - finally { - mem.releasePage(fullId.cacheId(), fullId.pageId(),page);} - } - finally { - ig.context().cache().context().database().checkpointReadUnlock(); - } - } - finally { - updLock.readLock().unlock(); - } - } - } - }, 8, "update-thread"); - - int checkpoints = 20; - - while (checkpoints > 0) { - Map<FullPageId, Integer> snapshot = null; - - Collection<FullPageId> pageIds; - - updLock.writeLock().lock(); - - try { - snapshot = new HashMap<>(resMap); - - pageIds = mem.beginCheckpoint(); - - checkpoints--; - - if (checkpoints == 0) - // No more writes should be done at this point. - run.set(false); - - info("Acquired pages for checkpoint: " + pageIds.size()); - } - finally { - updLock.writeLock().unlock(); - } - - boolean ok = false; - - try { - ByteBuffer tmpBuf = ByteBuffer.allocate(mem.pageSize()); - - tmpBuf.order(ByteOrder.nativeOrder()); - - long begin = System.currentTimeMillis(); - - long cp = 0; - - long write = 0; - - for (FullPageId fullId : pageIds) { - long cpStart = System.nanoTime(); - - Integer tag = mem.getForCheckpoint(fullId, tmpBuf, null); - - if (tag == null) - continue; - - long cpEnd = System.nanoTime(); - - cp += cpEnd - cpStart; - - Integer state = snapshot.get(fullId); - - if (allocated.contains(fullId) && state != -1) { - tmpBuf.rewind(); - - Integer first = null; - - for (int i = PageIO.COMMON_HEADER_END; i < mem.pageSize(); i++) { - int val = tmpBuf.get(i) & 0xFF; - - if (first == null) - first = val; - - // Avoid string concat. - if (first != val) - assertEquals("Corrupted buffer at position [pageId=" + fullId + ", pos=" + i + ']', - (int)first, val); - - // Avoid string concat. - if (state != val) - assertEquals("Invalid value at position [pageId=" + fullId + ", pos=" + i + ']', - (int)state, val); - } - } - - tmpBuf.rewind(); - - long writeStart = System.nanoTime(); - - storeMgr.write(cacheId, fullId.pageId(), tmpBuf, tag); - - long writeEnd = System.nanoTime(); - - write += writeEnd - writeStart; - - tmpBuf.rewind(); - } - - long syncStart = System.currentTimeMillis(); - - storeMgr.sync(cacheId, 0); - - long end = System.currentTimeMillis(); - - info("Written pages in " + (end - begin) + "ms, copy took " + (cp / 1_000_000) + "ms, " + - "write took " + (write / 1_000_000) + "ms, sync took " + (end - syncStart) + "ms"); - - ok = true; - } - finally { - info("Finishing checkpoint..."); - - mem.finishCheckpoint(); - - info("Finished checkpoint"); - - if (!ok) { - info("Cancelling updates..."); - - run.set(false); - - updFut.get(); - } - } - - if (checkpoints != 0) - Thread.sleep(2_000); - } - - info("checkpoints=" + checkpoints + ", done=" + updFut.isDone()); - - updFut.get(); - - // Mark the end. - wal.fsync(wal.log(new CheckpointRecord(cpRec.checkpointId(), start, true))); - - assertEquals(0, mem.activePagesCount()); - - for (FullPageId fullId : pages) { - - long page = mem.acquirePage(fullId.cacheId(), fullId.pageId()); - - try { - assertFalse("Page has a temp heap copy after the last checkpoint: [cacheId=" + - fullId.cacheId() + ", pageId=" + fullId.pageId() + "]", mem.hasTempCopy(page)); - - assertFalse("Page is dirty after the last checkpoint: [cacheId=" + - fullId.cacheId() + ", pageId=" + fullId.pageId() + "]", mem.isDirty(fullId.cacheId(), fullId.pageId(), page)); - } - finally { - mem.releasePage(fullId.cacheId(), fullId.pageId(), page); - } - } - - return F.t((Map<FullPageId, Integer>)resMap, start); - } - - /** - * - */ - private void deleteWorkFiles() throws IgniteCheckedException { - deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false)); - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/cd4d0400/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/PageStoreEvictionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/PageStoreEvictionSelfTest.java b/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/PageStoreEvictionSelfTest.java deleted file mode 100644 index 2ef4524..0000000 --- a/modules/pds/src/test/java/org/apache/ignite/cache/database/db/file/PageStoreEvictionSelfTest.java +++ /dev/null @@ -1,297 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.cache.database.db.file; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.concurrent.Callable; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.configuration.MemoryConfiguration; -import org.apache.ignite.configuration.MemoryPolicyConfiguration; -import org.apache.ignite.configuration.PersistentStoreConfiguration; -import org.apache.ignite.internal.IgniteEx; -import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.pagemem.FullPageId; -import org.apache.ignite.internal.pagemem.PageMemory; -import org.apache.ignite.internal.pagemem.PageUtils; -import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.processors.cache.database.IgniteCacheDatabaseSharedManager; -import org.apache.ignite.internal.processors.cache.database.tree.io.PageIO; -import org.apache.ignite.internal.util.typedef.internal.CU; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.testframework.GridTestUtils; -import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; - -/** - * Test for page evictions. - */ -public class PageStoreEvictionSelfTest extends GridCommonAbstractTest { - /** */ - private static final int NUMBER_OF_SEGMENTS = 64; - - /** */ - private static final int PAGE_SIZE = 1024; - - /** */ - private static final long CHUNK_SIZE = 1024 * 1024; - - /** */ - private static final long MEMORY_LIMIT = 10 * CHUNK_SIZE; - - /** */ - private static final int PAGES_NUM = 128_000; - - /** Cache name. */ - private final String cacheName = "cache"; - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - final IgniteConfiguration cfg = super.getConfiguration(gridName); - - cfg.setPersistentStoreConfiguration(new PersistentStoreConfiguration()); - - cfg.setMemoryConfiguration(createDbConfig()); - - cfg.setCacheConfiguration(new CacheConfiguration<>(cacheName)); - - return cfg; - } - - /** - * @return DB config. - */ - private MemoryConfiguration createDbConfig() { - final MemoryConfiguration memCfg = new MemoryConfiguration(); - - MemoryPolicyConfiguration memPlcCfg = new MemoryPolicyConfiguration(); - memPlcCfg.setInitialSize(MEMORY_LIMIT); - memPlcCfg.setMaxSize(MEMORY_LIMIT); - memPlcCfg.setName("dfltMemPlc"); - - memCfg.setPageSize(PAGE_SIZE); - memCfg.setConcurrencyLevel(NUMBER_OF_SEGMENTS); - memCfg.setMemoryPolicies(memPlcCfg); - memCfg.setDefaultMemoryPolicyName("dfltMemPlc"); - - return memCfg; - } - - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - super.beforeTestsStarted(); - - deleteWorkFiles(); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - super.afterTestsStopped(); - - stopAllGrids(); - - deleteWorkFiles(); - } - - /** - * @throws Exception If fail. - */ - public void testPageEviction() throws Exception { - final IgniteEx ig = startGrid(0); - - final PageMemory memory = getMemory(ig); - - writeData(ig, memory, CU.cacheId(cacheName)); - } - - /** - * @param memory Page memory. - * @param cacheId Cache id. - * @throws IgniteCheckedException If failed. - */ - private void writeData(final IgniteEx ignite, final PageMemory memory, final int cacheId) throws Exception { - final int size = PAGES_NUM; - - final List<FullPageId> pageIds = new ArrayList<>(size); - - IgniteCacheDatabaseSharedManager db = ignite.context().cache().context().database(); - - // Allocate. - for (int i = 0; i < size; i++) { - db.checkpointReadLock(); - try { - final FullPageId fullId = new FullPageId(memory.allocatePage(cacheId, i % 256, PageMemory.FLAG_DATA), - cacheId); - - pageIds.add(fullId); - } - finally { - db.checkpointReadUnlock(); - } - } - - System.out.println("Allocated pages: " + pageIds.size()); - - // Write data. (Causes evictions.) - final int part = PAGES_NUM / NUMBER_OF_SEGMENTS; - - final Collection<IgniteInternalFuture> futs = new ArrayList<>(); - - for (int i = 0; i < PAGES_NUM; i += part) - futs.add(runWriteInThread(ignite, i, i + part, memory, pageIds)); - - for (final IgniteInternalFuture fut : futs) - fut.get(); - - System.out.println("Wrote pages: " + pageIds.size()); - - // Read data. (Causes evictions.) - futs.clear(); - - for (int i = 0; i < PAGES_NUM; i += part) - futs.add(runReadInThread(ignite, i, i + part, memory, pageIds)); - - for (final IgniteInternalFuture fut : futs) - fut.get(); - - System.out.println("Read pages: " + pageIds.size()); - } - - /** - * @param start Start index. - * @param end End index. - * @param memory PageMemory. - * @param pageIds Allocated pages. - * @return Future. - * @throws Exception If fail. - */ - private IgniteInternalFuture runWriteInThread( - final IgniteEx ignite, - final int start, - final int end, - final PageMemory memory, - final List<FullPageId> pageIds - ) throws Exception { - - return GridTestUtils.runAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - IgniteCacheDatabaseSharedManager db = ignite.context().cache().context().database(); - - for (int i = start; i < end; i++) { - db.checkpointReadLock(); - - try { - FullPageId fullId = pageIds.get(i); - - long page = memory.acquirePage(fullId.cacheId(), fullId.pageId()); - - try { - final long pageAddr = memory.writeLock(fullId.cacheId(), fullId.pageId(), page); - - try { - PageIO.setPageId(pageAddr, fullId.pageId()); - - PageUtils.putLong(pageAddr, PageIO.COMMON_HEADER_END, i * 2); - } - finally { - memory.writeUnlock(fullId.cacheId(), fullId.pageId(), page, null, true); - } - } - finally { - memory.releasePage(fullId.cacheId(), fullId.pageId(), page); - } - } - finally { - db.checkpointReadUnlock(); - } - } - - return null; - } - }); - } - - /** - * @param start Start index. - * @param end End index. - * @param memory PageMemory. - * @param pageIds Allocated pages. - * @return Future. - * @throws Exception If fail. - */ - private IgniteInternalFuture runReadInThread(final IgniteEx ignite, final int start, final int end, - final PageMemory memory, - final List<FullPageId> pageIds) throws Exception { - return GridTestUtils.runAsync(new Callable<Object>() { - @Override public Object call() throws Exception { - IgniteCacheDatabaseSharedManager db = ignite.context().cache().context().database(); - - for (int i = start; i < end; i++) { - db.checkpointReadLock(); - - try { - final FullPageId fullId = pageIds.get(i); - - long page = memory.acquirePage(fullId.cacheId(), fullId.pageId()); - try { - final long pageAddr = memory.readLock(fullId.cacheId(), fullId.pageId(), page); - - try { - assertEquals(i * 2, PageUtils.getLong(pageAddr, PageIO.COMMON_HEADER_END)); - } - finally { - memory.readUnlock(fullId.cacheId(), fullId.pageId(), page); - } - } - finally { - memory.releasePage(fullId.cacheId(), fullId.pageId(), page); - } - } - finally { - db.checkpointReadUnlock(); - } - } - - return null; - } - }); - } - - /** - * @param ig Ignite instance. - * @return Memory and store. - * @throws Exception If failed to initialize the store. - */ - private PageMemory getMemory(IgniteEx ig) throws Exception { - final GridCacheSharedContext<Object, Object> sharedCtx = ig.context().cache().context(); - - final IgniteCacheDatabaseSharedManager db = sharedCtx.database(); - - return db.memoryPolicy(null).pageMemory(); - } - - /** - * @throws IgniteCheckedException If fail. - */ - private void deleteWorkFiles() throws IgniteCheckedException { - deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "db", false)); - } -}