IGNITE-6423: PDS could be corrupted if partition have been evicted and owned again. This closes #3115.
Fixed page memory update operations without checkpoint lock. Fixed page CRC calculation. Fixed outdated page handling. Added checkpoint lock hold assertions for memory update operations. Fixed incorrect tests. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e24d4d03 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e24d4d03 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e24d4d03 Branch: refs/heads/ignite-zk Commit: e24d4d03dd00cc464e6e793e2a3f23988664a0cd Parents: 24412f5 Author: Andrey V. Mashenkov <andrey.mashen...@gmail.com> Authored: Wed Dec 13 14:16:50 2017 +0300 Committer: Andrey V. Mashenkov <andrey.mashen...@gmail.com> Committed: Wed Dec 13 14:16:50 2017 +0300 ---------------------------------------------------------------------- .../internal/pagemem/store/PageStore.java | 5 +- .../processors/cache/GridCacheTtlManager.java | 3 +- .../cache/IgniteCacheOffheapManagerImpl.java | 41 +++-- .../distributed/dht/GridDhtTxPrepareFuture.java | 5 + .../local/atomic/GridLocalAtomicCache.java | 171 ++++++++++--------- .../GridCacheDatabaseSharedManager.java | 9 +- .../persistence/GridCacheOffheapManager.java | 4 + .../processors/cache/persistence/RowStore.java | 2 + .../cache/persistence/file/FilePageStore.java | 32 +++- .../persistence/file/FilePageStoreManager.java | 7 +- .../persistence/pagemem/PageMemoryImpl.java | 56 ++++-- .../processors/cache/tree/CacheDataTree.java | 2 + .../cache/tree/PendingEntriesTree.java | 2 + .../pagemem/impl/PageMemoryNoLoadSelfTest.java | 10 +- .../cache/persistence/DummyPageIO.java | 41 +++++ .../IgnitePdsContinuousRestartTest.java | 5 - ...gnitePdsRecoveryAfterFileCorruptionTest.java | 83 ++++++--- ...ckpointSimulationWithRealCpDisabledTest.java | 53 +++++- .../db/file/IgnitePdsEvictionTest.java | 30 ++++ .../query/h2/database/H2TreeIndex.java | 58 ++++--- 20 files changed, 439 insertions(+), 180 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/e24d4d03/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java index f6e577c..42d584d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java @@ -71,10 +71,11 @@ public interface PageStore { * @param pageId Page ID. * @param pageBuf Page buffer to write. * @param tag Partition file version, 1-based incrementing counter. For outdated pages {@code tag} has lower value, - * and write does nothing + * and write does nothing. + * @param calculateCrc if {@code False} crc calculation will be forcibly skipped. * @throws IgniteCheckedException If page writing failed (IO error occurred). */ - public void write(long pageId, ByteBuffer pageBuf, int tag) throws IgniteCheckedException; + public void write(long pageId, ByteBuffer pageBuf, int tag, boolean calculateCrc) throws IgniteCheckedException; /** * Gets page offset within the store file. http://git-wip-us.apache.org/repos/asf/ignite/blob/e24d4d03/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java index b006154..9c013fc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheTtlManager.java @@ -23,7 +23,6 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor; import org.apache.ignite.internal.util.GridConcurrentSkipListSet; import org.apache.ignite.internal.util.lang.IgniteInClosure2X; import org.apache.ignite.internal.util.typedef.X; @@ -145,7 +144,7 @@ public class GridCacheTtlManager extends GridCacheManagerAdapter { try { X.println(">>>"); X.println(">>> TTL processor memory stats [igniteInstanceName=" + cctx.igniteInstanceName() + - ", cache=" + cctx.name() + ']'); + ", cache=" + cctx.name() + ']'); X.println(">>> pendingEntriesSize: " + pendingSize()); } catch (IgniteCheckedException e) { http://git-wip-us.apache.org/repos/asf/ignite/blob/e24d4d03/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index 370a92e..8ad6d4b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -139,20 +139,20 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } /** {@inheritDoc} */ - public void onCacheStarted(GridCacheContext cctx) throws IgniteCheckedException{ + public void onCacheStarted(GridCacheContext cctx) throws IgniteCheckedException { if (cctx.affinityNode() && cctx.ttl().eagerTtlEnabled() && pendingEntries == null) { String name = "PendingEntries"; - long rootPage = allocateForTree(); + long rootPage = allocateForTree(); - pendingEntries = new PendingEntriesTree( - grp, - name, - grp.dataRegion().pageMemory(), - rootPage, - grp.reuseList(), - true); - } + pendingEntries = new PendingEntriesTree( + grp, + name, + grp.dataRegion().pageMemory(), + rootPage, + grp.reuseList(), + true); + } } /** @@ -196,6 +196,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager try { if (grp.sharedGroup()) { assert cacheId != CU.UNDEFINED_CACHE_ID; + assert ctx.database().checkpointLockIsHeldByThread(); for (CacheDataStore store : cacheDataStores()) store.clear(cacheId); @@ -437,7 +438,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager while (it.hasNext()) { cctx.shared().database().checkpointReadLock(); - try{ + try { KeyCacheObject key = it.next().key(); try { @@ -1192,6 +1193,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager try { int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID; + assert cctx.shared().database().checkpointLockIsHeldByThread(); + dataTree.invoke(new SearchRow(cacheId, key), CacheDataRowAdapter.RowData.NO_KEY, c); switch (c.operationType()) { @@ -1232,8 +1235,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager CacheObject val, GridCacheVersion ver, long expireTime, - @Nullable CacheDataRow oldRow) throws IgniteCheckedException - { + @Nullable CacheDataRow oldRow) throws IgniteCheckedException { int cacheId = grp.storeCacheIdInDataPage() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID; DataRow dataRow = new DataRow(key, val, ver, partId, expireTime, cacheId); @@ -1258,8 +1260,9 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager } /** {@inheritDoc} */ - @Override public void update(GridCacheContext cctx,KeyCacheObject key, - + @Override public void update( + GridCacheContext cctx, + KeyCacheObject key, CacheObject val, GridCacheVersion ver, long expireTime, @@ -1285,6 +1288,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager CacheDataRow old; + assert cctx.shared().database().checkpointLockIsHeldByThread(); + if (canUpdateOldRow(cctx, oldRow, dataRow) && rowStore.updateRow(oldRow.link(), dataRow)) { old = oldRow; @@ -1363,6 +1368,8 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager try { int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID; + assert cctx.shared().database().checkpointLockIsHeldByThread(); + CacheDataRow oldRow = dataTree.remove(new SearchRow(cacheId, key)); finishRemove(cctx, key, oldRow); @@ -1425,8 +1432,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager return dataTree.find(null, null); } - /** {@inheritDoc} - * @param cacheId*/ + /** {@inheritDoc} */ @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId) throws IgniteCheckedException { return cursor(cacheId, null, null); } @@ -1486,6 +1492,7 @@ public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager /** {@inheritDoc} */ @Override public void clear(int cacheId) throws IgniteCheckedException { assert cacheId != CU.UNDEFINED_CACHE_ID; + assert ctx.database().checkpointLockIsHeldByThread(); if (cacheSize(cacheId) == 0) return; http://git-wip-us.apache.org/repos/asf/ignite/blob/e24d4d03/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index 6873890..0fb9ee4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -1819,6 +1819,8 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite GridDrType drType = cacheCtx.isDrEnabled() ? GridDrType.DR_PRELOAD : GridDrType.DR_NONE; + cctx.database().checkpointReadLock(); + try { if (entry.initialValue(info.value(), info.version(), @@ -1850,6 +1852,9 @@ public final class GridDhtTxPrepareFuture extends GridCacheCompoundFuture<Ignite log.debug("Failed to set entry initial value (entry is obsolete, " + "will retry): " + entry); } + finally { + cctx.database().checkpointReadUnlock(); + } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e24d4d03/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index 599a58c..1454e96 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -830,113 +830,120 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { CacheEntryPredicate[] filters = CU.filterArray(filter); - ctx.shared().database().ensureFreeSpace(ctx.dataRegion()); - - if (writeThrough && keys.size() > 1) { - return updateWithBatch(op, - keys, - vals, - invokeArgs, - expiryPlc, - ver, - filters, - keepBinary, - subjId, - taskName); - } - - Iterator<?> valsIter = vals != null ? vals.iterator() : null; - IgniteBiTuple<Boolean, ?> res = null; CachePartialUpdateCheckedException err = null; - boolean intercept = ctx.config().getInterceptor() != null; + ctx.shared().database().checkpointReadLock(); - for (K key : keys) { - if (key == null) - throw new NullPointerException("Null key."); + try { + ctx.shared().database().ensureFreeSpace(ctx.dataRegion()); + + if (writeThrough && keys.size() > 1) { + return updateWithBatch(op, + keys, + vals, + invokeArgs, + expiryPlc, + ver, + filters, + keepBinary, + subjId, + taskName); + } - Object val = valsIter != null ? valsIter.next() : null; + Iterator<?> valsIter = vals != null ? vals.iterator() : null; - if (val == null && op != DELETE) - throw new NullPointerException("Null value."); + boolean intercept = ctx.config().getInterceptor() != null; - KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); + for (K key : keys) { + if (key == null) + throw new NullPointerException("Null key."); - if (op == UPDATE) { - val = ctx.toCacheObject(val); + Object val = valsIter != null ? valsIter.next() : null; - ctx.validateKeyAndValue(cacheKey, (CacheObject)val); - } - else if (op == TRANSFORM) - ctx.kernalContext().resource().inject(val, GridResourceIoc.AnnotationSet.ENTRY_PROCESSOR, ctx.name()); + if (val == null && op != DELETE) + throw new NullPointerException("Null value."); - while (true) { - GridCacheEntryEx entry = null; + KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); - try { - entry = entryEx(cacheKey); - - GridTuple3<Boolean, Object, EntryProcessorResult<Object>> t = entry.innerUpdateLocal( - ver, - val == null ? DELETE : op, - val, - invokeArgs, - writeThrough, - readThrough, - retval, - keepBinary, - expiryPlc, - true, - true, - filters, - intercept, - subjId, - taskName); + if (op == UPDATE) { + val = ctx.toCacheObject(val); - if (op == TRANSFORM) { - if (t.get3() != null) { - Map<K, EntryProcessorResult> computedMap; + ctx.validateKeyAndValue(cacheKey, (CacheObject)val); + } + else if (op == TRANSFORM) + ctx.kernalContext().resource().inject(val, GridResourceIoc.AnnotationSet.ENTRY_PROCESSOR, ctx.name()); - if (res == null) { - computedMap = U.newHashMap(keys.size()); + while (true) { + GridCacheEntryEx entry = null; - res = new IgniteBiTuple<>(true, computedMap); - } - else - computedMap = (Map<K, EntryProcessorResult>)res.get2(); + try { + entry = entryEx(cacheKey); - computedMap.put(key, t.get3()); + GridTuple3<Boolean, Object, EntryProcessorResult<Object>> t = entry.innerUpdateLocal( + ver, + val == null ? DELETE : op, + val, + invokeArgs, + writeThrough, + readThrough, + retval, + keepBinary, + expiryPlc, + true, + true, + filters, + intercept, + subjId, + taskName); + + if (op == TRANSFORM) { + if (t.get3() != null) { + Map<K, EntryProcessorResult> computedMap; + + if (res == null) { + computedMap = U.newHashMap(keys.size()); + + res = new IgniteBiTuple<>(true, computedMap); + } + else + computedMap = (Map<K, EntryProcessorResult>)res.get2(); + + computedMap.put(key, t.get3()); + } } - } - else if (res == null) - res = new T2(t.get1(), t.get2()); + else if (res == null) + res = new T2(t.get1(), t.get2()); - break; // While. - } - catch (GridCacheEntryRemovedException ignored) { - if (log.isDebugEnabled()) - log.debug("Got removed entry while updating (will retry): " + key); + break; // While. + } + catch (GridCacheEntryRemovedException ignored) { + if (log.isDebugEnabled()) + log.debug("Got removed entry while updating (will retry): " + key); - entry = null; - } - catch (IgniteCheckedException e) { - if (err == null) - err = partialUpdateException(); + entry = null; + } + catch (IgniteCheckedException e) { + if (err == null) + err = partialUpdateException(); - err.add(F.asList(key), e); + err.add(F.asList(key), e); - U.error(log, "Failed to update key : " + key, e); + U.error(log, "Failed to update key : " + key, e); - break; - } - finally { - if (entry != null) - ctx.evicts().touch(entry, ctx.affinity().affinityTopologyVersion()); + break; + } + finally { + if (entry != null) + ctx.evicts().touch(entry, ctx.affinity().affinityTopologyVersion()); + } } } } + finally { + ctx.shared().database().checkpointReadUnlock(); + } if (err != null) throw err; http://git-wip-us.apache.org/repos/asf/ignite/blob/e24d4d03/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index c0e59bc..3c2842f 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -1850,7 +1850,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan if (tag != null) { tmpWriteBuf.rewind(); - PageStore store = storeMgr.writeInternal(fullId.groupId(), fullId.pageId(), tmpWriteBuf, tag); + PageStore store = storeMgr.writeInternal(fullId.groupId(), fullId.pageId(), tmpWriteBuf, tag, true); tmpWriteBuf.rewind(); @@ -2640,6 +2640,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan fullId, tmpWriteBuf, persStoreMetrics.metricsEnabled() ? tracker : null); if (tag != null) { + assert PageIO.getType(tmpWriteBuf) != 0 : "Invalid state. Type is 0! pageId = " + U.hexLong(fullId.pageId()); + assert PageIO.getVersion(tmpWriteBuf) != 0 : "Invalid state. Version is 0! pageId = " + U.hexLong(fullId.pageId()); + tmpWriteBuf.rewind(); if (persStoreMetrics.metricsEnabled()) { @@ -2661,9 +2664,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan tmpWriteBuf.rewind(); - PageIO.setCrc(writeAddr, 0); - - PageStore store = storeMgr.writeInternal(grpId, fullId.pageId(), tmpWriteBuf, tag); + PageStore store = storeMgr.writeInternal(grpId, fullId.pageId(), tmpWriteBuf, tag, false); updStores.add(store); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e24d4d03/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java index cfa1829..e818b00 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java @@ -922,6 +922,8 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple reuseRoot.pageId().pageId(), reuseRoot.isAllocated()) { @Override protected long allocatePageNoReuse() throws IgniteCheckedException { + assert grp.shared().database().checkpointLockIsHeldByThread(); + return pageMem.allocatePage(grpId, partId, PageIdAllocator.FLAG_DATA); } }; @@ -938,6 +940,8 @@ public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl imple treeRoot.pageId().pageId(), treeRoot.isAllocated()) { @Override protected long allocatePageNoReuse() throws IgniteCheckedException { + assert grp.shared().database().checkpointLockIsHeldByThread(); + return pageMem.allocatePage(grpId, partId, PageIdAllocator.FLAG_DATA); } }; http://git-wip-us.apache.org/repos/asf/ignite/blob/e24d4d03/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java index 2051021..ad2f731 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/RowStore.java @@ -107,6 +107,8 @@ public class RowStore { * @return {@code True} if was able to update row. */ public boolean updateRow(long link, CacheDataRow row) throws IgniteCheckedException { + assert !persistenceEnabled || ctx.database().checkpointLockIsHeldByThread(); + return freeList.updateDataRow(link, row); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e24d4d03/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java index 408240c..47f1d4d 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java @@ -433,7 +433,7 @@ public class FilePageStore implements PageStore { } /** {@inheritDoc} */ - @Override public void write(long pageId, ByteBuffer pageBuf, int tag) throws IgniteCheckedException { + @Override public void write(long pageId, ByteBuffer pageBuf, int tag, boolean calculateCrc) throws IgniteCheckedException { init(); lock.readLock().lock(); @@ -450,13 +450,20 @@ public class FilePageStore implements PageStore { assert pageBuf.capacity() == pageSize; assert pageBuf.position() == 0; assert pageBuf.order() == ByteOrder.nativeOrder(); - assert PageIO.getCrc(pageBuf) == 0 : U.hexLong(pageId); + assert PageIO.getType(pageBuf) != 0 : "Invalid state. Type is 0! pageId = " + U.hexLong(pageId); + assert PageIO.getVersion(pageBuf) != 0 : "Invalid state. Version is 0! pageId = " + U.hexLong(pageId); - int crc32 = skipCrc ? 0 : PureJavaCrc32.calcCrc32(pageBuf, pageSize); + if (calculateCrc && !skipCrc) { + assert PageIO.getCrc(pageBuf) == 0 : U.hexLong(pageId); - PageIO.setCrc(pageBuf, crc32); + PageIO.setCrc(pageBuf, calcCrc32(pageBuf, pageSize)); + } - pageBuf.position(0); + // Check whether crc was calculated somewhere above the stack if it is forcibly skipped. + assert skipCrc || PageIO.getCrc(pageBuf) != 0 || calcCrc32(pageBuf, pageSize) == 0 : + "CRC hasn't been calculated, crc=0"; + + assert pageBuf.position() == 0 : pageBuf.position(); int len = pageSize; @@ -480,6 +487,21 @@ public class FilePageStore implements PageStore { } } + /** + * @param pageBuf Page buffer. + * @param pageSize Page size. + */ + private static int calcCrc32(ByteBuffer pageBuf, int pageSize) { + try { + pageBuf.position(0); + + return PureJavaCrc32.calcCrc32(pageBuf, pageSize); + } + finally { + pageBuf.position(0); + } + } + /** {@inheritDoc} */ @Override public long pageOffset(long pageId) { return (long) PageIdUtils.pageIndex(pageId) * pageSize + headerSize(); http://git-wip-us.apache.org/repos/asf/ignite/blob/e24d4d03/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java index 1fe22ca..66af0dd 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java @@ -291,7 +291,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen /** {@inheritDoc} */ @Override public void write(int grpId, long pageId, ByteBuffer pageBuf, int tag) throws IgniteCheckedException { - writeInternal(grpId, pageId, pageBuf, tag); + writeInternal(grpId, pageId, pageBuf, tag, true); } /** {@inheritDoc} */ @@ -306,15 +306,16 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen * @param pageId Page ID. * @param pageBuf Page buffer. * @param tag Partition tag (growing 1-based partition file version). Used to validate page is not outdated + * @param calculateCrc if {@code False} crc calculation will be forcibly skipped. * @return PageStore to which the page has been written. * @throws IgniteCheckedException If IO error occurred. */ - public PageStore writeInternal(int cacheId, long pageId, ByteBuffer pageBuf, int tag) throws IgniteCheckedException { + public PageStore writeInternal(int cacheId, long pageId, ByteBuffer pageBuf, int tag, boolean calculateCrc) throws IgniteCheckedException { int partId = PageIdUtils.partId(pageId); PageStore store = getStore(cacheId, partId); - store.write(pageId, pageBuf, tag); + store.write(pageId, pageBuf, tag, calculateCrc); return store; } http://git-wip-us.apache.org/repos/asf/ignite/blob/e24d4d03/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java index 3014099..41de7f0 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java @@ -173,7 +173,7 @@ public class PageMemoryImpl implements PageMemoryEx { private final int sysPageSize; /** Shared context. */ - private final GridCacheSharedContext<?, ?> sharedCtx; + private final GridCacheSharedContext<?, ?> ctx; /** State checker. */ private final CheckpointLockStateChecker stateChecker; @@ -236,7 +236,7 @@ public class PageMemoryImpl implements PageMemoryEx { /** * @param directMemoryProvider Memory allocator to use. - * @param sharedCtx Cache shared context. + * @param ctx Cache shared context. * @param pageSize Page size. * @param flushDirtyPage Callback invoked when a dirty page is evicted. * @param changeTracker Callback invoked to track changes in pages. @@ -245,7 +245,7 @@ public class PageMemoryImpl implements PageMemoryEx { public PageMemoryImpl( DirectMemoryProvider directMemoryProvider, long[] sizes, - GridCacheSharedContext<?, ?> sharedCtx, + GridCacheSharedContext<?, ?> ctx, int pageSize, GridInClosure3X<FullPageId, ByteBuffer, Integer> flushDirtyPage, GridInClosure3X<Long, FullPageId, PageMemoryEx> changeTracker, @@ -253,11 +253,11 @@ public class PageMemoryImpl implements PageMemoryEx { DataRegionMetricsImpl memMetrics, boolean throttleEnabled ) { - assert sharedCtx != null; + assert ctx != null; - log = sharedCtx.logger(PageMemoryImpl.class); + log = ctx.logger(PageMemoryImpl.class); - this.sharedCtx = sharedCtx; + this.ctx = ctx; this.directMemoryProvider = directMemoryProvider; this.sizes = sizes; this.flushDirtyPage = flushDirtyPage; @@ -265,8 +265,8 @@ public class PageMemoryImpl implements PageMemoryEx { this.stateChecker = stateChecker; this.throttleEnabled = throttleEnabled; - storeMgr = sharedCtx.pageStore(); - walMgr = sharedCtx.wal(); + storeMgr = ctx.pageStore(); + walMgr = ctx.wal(); assert storeMgr != null; assert walMgr != null; @@ -336,15 +336,15 @@ public class PageMemoryImpl implements PageMemoryEx { * */ private void initWriteThrottle() { - if (!(sharedCtx.database() instanceof GridCacheDatabaseSharedManager)) { + if (!(ctx.database() instanceof GridCacheDatabaseSharedManager)) { log.error("Write throttle can't start. Unexpected class of database manager: " + - sharedCtx.database().getClass()); + ctx.database().getClass()); throttleEnabled = false; } if (throttleEnabled) - writeThrottle = new PagesWriteThrottle(this, (GridCacheDatabaseSharedManager)sharedCtx.database()); + writeThrottle = new PagesWriteThrottle(this, (GridCacheDatabaseSharedManager)ctx.database()); } /** {@inheritDoc} */ @@ -423,6 +423,8 @@ public class PageMemoryImpl implements PageMemoryEx { flags == PageIdAllocator.FLAG_IDX && partId == PageIdAllocator.INDEX_PARTITION : "flags = " + flags + ", partId = " + partId; + assert ctx.database().checkpointLockIsHeldByThread(); + long pageId = storeMgr.allocatePage(cacheId, partId, flags); memMetrics.incrementTotalAllocatedPages(); @@ -441,7 +443,19 @@ public class PageMemoryImpl implements PageMemoryEx { boolean isTrackingPage = trackingIO.trackingPageFor(pageId, pageSize()) == pageId; try { - long relPtr = seg.borrowOrAllocateFreePage(pageId); + long relPtr = seg.loadedPages.get( + cacheId, + PageIdUtils.effectivePageId(pageId), + seg.partTag(cacheId, partId), + INVALID_REL_PTR, + OUTDATED_REL_PTR + ); + + if (relPtr == OUTDATED_REL_PTR) + relPtr = refreshOutdatedPage(seg, cacheId, pageId, false); + + if (relPtr == INVALID_REL_PTR) + relPtr = seg.borrowOrAllocateFreePage(pageId); if (relPtr == INVALID_REL_PTR) relPtr = seg.evictPage(); @@ -470,8 +484,8 @@ public class PageMemoryImpl implements PageMemoryEx { if (PageIO.getType(pageAddr) == 0) { trackingIO.initNewPage(pageAddr, pageId, pageSize()); - if (!sharedCtx.wal().isAlwaysWriteFullPages()) - sharedCtx.wal().log( + if (!ctx.wal().isAlwaysWriteFullPages()) + ctx.wal().log( new InitNewPageRecord( cacheId, pageId, @@ -480,7 +494,7 @@ public class PageMemoryImpl implements PageMemoryEx { ) ); else - sharedCtx.wal().log(new PageSnapshot(fullId, absPtr + PAGE_OVERHEAD, pageSize())); + ctx.wal().log(new PageSnapshot(fullId, absPtr + PAGE_OVERHEAD, pageSize())); } } @@ -991,6 +1005,9 @@ public class PageMemoryImpl implements PageMemoryEx { PageHeader.releasePage(absPtr); } + assert PageIO.getType(tmpBuf) != 0 : "Invalid state. Type is 0! pageId = " + U.hexLong(fullId.pageId()); + assert PageIO.getVersion(tmpBuf) != 0 : "Invalid state. Version is 0! pageId = " + U.hexLong(fullId.pageId()); + return true; } finally { @@ -1229,6 +1246,9 @@ public class PageMemoryImpl implements PageMemoryEx { pageSize() ); + assert PageIO.getType(tmpAbsPtr + PAGE_OVERHEAD) != 0 : "Invalid state. Type is 0! pageId = " + U.hexLong(fullId.pageId()); + assert PageIO.getVersion(tmpAbsPtr + PAGE_OVERHEAD) != 0 : "Invalid state. Version is 0! pageId = " + U.hexLong(fullId.pageId()); + PageHeader.dirty(absPtr, false); PageHeader.tempBufferPointer(absPtr, tmpRelPtr); @@ -1269,6 +1289,10 @@ public class PageMemoryImpl implements PageMemoryEx { long pageId = PageIO.getPageId(page + PAGE_OVERHEAD); + assert pageId != 0 : U.hexLong(PageHeader.readPageId(page)); + assert PageIO.getVersion(page + PAGE_OVERHEAD) != 0 : U.hexLong(pageId); + assert PageIO.getType(page + PAGE_OVERHEAD) != 0 : U.hexLong(pageId); + try { rwLock.writeUnlock(page + PAGE_LOCK_OFFSET, PageIdUtils.tag(pageId)); @@ -1390,6 +1414,8 @@ public class PageMemoryImpl implements PageMemoryEx { boolean wasDirty = PageHeader.dirty(absPtr, dirty); if (dirty) { + assert ctx.database().checkpointLockIsHeldByThread(); + if (!wasDirty || forceAdd) { boolean added = segment(pageId.groupId(), pageId.pageId()).dirtyPages.add(pageId); http://git-wip-us.apache.org/repos/asf/ignite/blob/e24d4d03/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java index afa3fd7..f2bfa41 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/CacheDataTree.java @@ -77,6 +77,8 @@ public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> { this.rowStore = rowStore; this.grp = grp; + assert !grp.dataRegion().config().isPersistenceEnabled() || grp.shared().database().checkpointLockIsHeldByThread(); + initTree(initNew); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e24d4d03/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/PendingEntriesTree.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/PendingEntriesTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/PendingEntriesTree.java index a6ec6e7..0b1c931 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/PendingEntriesTree.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/tree/PendingEntriesTree.java @@ -64,6 +64,8 @@ public class PendingEntriesTree extends BPlusTree<PendingRow, PendingRow> { this.grp = grp; + assert !grp.dataRegion().config().isPersistenceEnabled() || grp.shared().database().checkpointLockIsHeldByThread(); + initTree(initNew); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e24d4d03/modules/core/src/test/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoLoadSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoLoadSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoLoadSelfTest.java index 3b9e393..0f3bf9c 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoLoadSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoLoadSelfTest.java @@ -33,6 +33,7 @@ import org.apache.ignite.internal.pagemem.PageIdUtils; import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.pagemem.PageUtils; import org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl; +import org.apache.ignite.internal.processors.cache.persistence.DummyPageIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; @@ -47,6 +48,9 @@ public class PageMemoryNoLoadSelfTest extends GridCommonAbstractTest { /** */ private static final int MAX_MEMORY_SIZE = 10 * 1024 * 1024; + /** */ + private static final PageIO PAGE_IO = new DummyPageIO(); + /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), "pagemem", false)); @@ -226,6 +230,8 @@ public class PageMemoryNoLoadSelfTest extends GridCommonAbstractTest { assertNotNull(pageAddr); try { + PAGE_IO.initNewPage(pageAddr, id.pageId(), mem.pageSize()); + long updId = PageIdUtils.rotatePageId(id.pageId()); PageIO.setPageId(pageAddr, updId); @@ -334,7 +340,7 @@ public class PageMemoryNoLoadSelfTest extends GridCommonAbstractTest { long pageAddr = mem.writeLock(-1, pageId, page); try { - PageIO.setPageId(pageAddr, pageId); + PAGE_IO.initNewPage(pageAddr, pageId, mem.pageSize()); for (int i = PageIO.COMMON_HEADER_END; i < PAGE_SIZE; i++) PageUtils.putByte(pageAddr, i, (byte)val); @@ -355,7 +361,7 @@ public class PageMemoryNoLoadSelfTest extends GridCommonAbstractTest { long pageAddr = mem.readLock(-1, pageId, page); - assert(pageAddr != 0); + assert pageAddr != 0; try { for (int i = PageIO.COMMON_HEADER_END; i < PAGE_SIZE; i++) { http://git-wip-us.apache.org/repos/asf/ignite/blob/e24d4d03/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/DummyPageIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/DummyPageIO.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/DummyPageIO.java new file mode 100644 index 0000000..1b36ac1 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/DummyPageIO.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; +import org.apache.ignite.internal.util.GridStringBuilder; + +/** + * Dummy PageIO implementation. For test purposes only. + */ +public class DummyPageIO extends PageIO { + /** */ + public DummyPageIO() { + super(2 * Short.MAX_VALUE, 1); + } + + /** {@inheritDoc} */ + @Override + protected void printPage(long addr, int pageSize, GridStringBuilder sb) throws IgniteCheckedException { + sb.a("DummyPageIO [\n"); + sb.a("addr=").a(addr).a(", "); + sb.a("pageSize=").a(addr); + sb.a("\n]"); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e24d4d03/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest.java index 27b1950..fa89bf2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsContinuousRestartTest.java @@ -221,11 +221,6 @@ public class IgnitePdsContinuousRestartTest extends GridCommonAbstractTest { checkRebalancingDuringLoad(10, 500, 8, 16); } - /** {@inheritDoc} */ - @Override protected long getTestTimeout() { - return TimeUnit.MINUTES.toMillis(3); - } - /** * @throws Exception if failed. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/e24d4d03/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java index 9369443..8e20585 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java @@ -142,20 +142,33 @@ public class IgnitePdsRecoveryAfterFileCorruptionTest extends GridCommonAbstract PageMemory mem = sharedCtx.database().dataRegion(policyName).pageMemory(); + DummyPageIO pageIO = new DummyPageIO(); + int cacheId = sharedCtx.cache().cache(cacheName).context().cacheId(); FullPageId[] pages = new FullPageId[totalPages]; - for (int i = 0; i < totalPages; i++) - pages[i] = new FullPageId(mem.allocatePage(cacheId, 0, PageIdAllocator.FLAG_DATA), cacheId); + // Get lock to prevent assertion. A new page should be allocated under checkpoint lock. + psMgr.checkpointReadLock(); - generateWal( - (PageMemoryImpl)mem, - sharedCtx.pageStore(), - sharedCtx.wal(), - cacheId, - pages - ); + try { + for (int i = 0; i < totalPages; i++) { + pages[i] = new FullPageId(mem.allocatePage(cacheId, 0, PageIdAllocator.FLAG_DATA), cacheId); + + initPage(mem, pageIO, pages[i]); + } + + generateWal( + (PageMemoryImpl)mem, + sharedCtx.pageStore(), + sharedCtx.wal(), + cacheId, + pages + ); + } + finally { + psMgr.checkpointReadUnlock(); + } eraseDataFromDisk(pageStore, cacheId, pages[0]); @@ -169,6 +182,31 @@ public class IgnitePdsRecoveryAfterFileCorruptionTest extends GridCommonAbstract } /** + * Initializes page. + * @param mem page memory implementation. + * @param pageIO page io implementation. + * @param fullId full page id. + * @throws IgniteCheckedException if error occurs. + */ + private void initPage(PageMemory mem, PageIO pageIO, FullPageId fullId) throws IgniteCheckedException { + long page = mem.acquirePage(fullId.groupId(), fullId.pageId()); + + try { + final long pageAddr = mem.writeLock(fullId.groupId(), fullId.pageId(), page); + + try { + pageIO.initNewPage(pageAddr, fullId.pageId(), mem.pageSize()); + } + finally { + mem.writeUnlock(fullId.groupId(), fullId.pageId(), page, null, true); + } + } + finally { + mem.releasePage(fullId.groupId(), fullId.pageId(), page); + } + } + + /** * @param pageStore Page store. * @param cacheId Cache id. * @param page Page. @@ -207,21 +245,28 @@ public class IgnitePdsRecoveryAfterFileCorruptionTest extends GridCommonAbstract PageMemory mem = shared.database().dataRegion(null).pageMemory(); - for (FullPageId fullId : pages) { - long page = mem.acquirePage(fullId.groupId(), fullId.pageId()); + dbMgr.checkpointReadLock(); - try { - long pageAddr = mem.readLock(fullId.groupId(), fullId.pageId(), page); + try { + for (FullPageId fullId : pages) { + long page = mem.acquirePage(fullId.groupId(), fullId.pageId()); + + try { + long pageAddr = mem.readLock(fullId.groupId(), fullId.pageId(), page); - for (int j = PageIO.COMMON_HEADER_END; j < mem.pageSize(); j += 4) - assertEquals(j + (int)fullId.pageId(), PageUtils.getInt(pageAddr, j)); + for (int j = PageIO.COMMON_HEADER_END; j < mem.pageSize(); j += 4) + assertEquals(j + (int)fullId.pageId(), PageUtils.getInt(pageAddr, j)); - mem.readUnlock(fullId.groupId(), fullId.pageId(), page); - } - finally { - mem.releasePage(fullId.groupId(), fullId.pageId(), page); + mem.readUnlock(fullId.groupId(), fullId.pageId(), page); + } + finally { + mem.releasePage(fullId.groupId(), fullId.pageId(), page); + } } } + finally { + dbMgr.checkpointReadUnlock(); + } } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/e24d4d03/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java index 5ae8969..0dd9153 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java @@ -47,7 +47,9 @@ import org.apache.ignite.internal.pagemem.FullPageId; import org.apache.ignite.internal.pagemem.PageIdAllocator; import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.pagemem.PageUtils; +import org.apache.ignite.internal.processors.cache.persistence.DummyPageIO; import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx; import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl; import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO; @@ -519,10 +521,12 @@ public class IgnitePdsCheckpointSimulationWithRealCpDisabledTest extends GridCom PageMemoryEx mem = (PageMemoryEx) dbMgr.dataRegion(null).pageMemory(); - ig.context().cache().context().database().checkpointReadLock(); - FullPageId[] pageIds = new FullPageId[100]; + DummyPageIO pageIO = new DummyPageIO(); + + ig.context().cache().context().database().checkpointReadLock(); + try { for (int i = 0; i < pageIds.length; i++) pageIds[i] = new FullPageId(mem.allocatePage(cacheId, 0, PageIdAllocator.FLAG_DATA), cacheId); @@ -535,9 +539,9 @@ public class IgnitePdsCheckpointSimulationWithRealCpDisabledTest extends GridCom long pageAddr = mem.writeLock(fullId.groupId(), fullId.pageId(), page); - PageIO.setPageId(pageAddr, fullId.pageId()); - try { + pageIO.initNewPage(pageAddr, fullId.pageId(), mem.pageSize()); + assertTrue(mem.isDirty(fullId.groupId(), fullId.pageId(), page)); } finally { @@ -737,8 +741,22 @@ public class IgnitePdsCheckpointSimulationWithRealCpDisabledTest extends GridCom Set<FullPageId> allocated = new HashSet<>(); + IgniteCacheDatabaseSharedManager db = ig.context().cache().context().database(); + + PageIO pageIO = new DummyPageIO(); + for (int i = 0; i < TOTAL_PAGES; i++) { - FullPageId fullId = new FullPageId(mem.allocatePage(cacheId, 0, PageIdAllocator.FLAG_DATA), cacheId); + FullPageId fullId; + + db.checkpointReadLock(); + try { + fullId = new FullPageId(mem.allocatePage(cacheId, 0, PageIdAllocator.FLAG_DATA), cacheId); + + initPage(mem, pageIO, fullId); + } + finally { + db.checkpointReadUnlock(); + } resMap.put(fullId, -1); @@ -982,4 +1000,29 @@ public class IgnitePdsCheckpointSimulationWithRealCpDisabledTest extends GridCom private void deleteWorkFiles() throws IgniteCheckedException { deleteRecursively(U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR, false)); } + + /** + * Initializes page. + * @param mem page memory implementation. + * @param pageIO page io implementation. + * @param fullId full page id. + * @throws IgniteCheckedException if error occurs. + */ + private void initPage(PageMemory mem, PageIO pageIO, FullPageId fullId) throws IgniteCheckedException { + long page = mem.acquirePage(fullId.groupId(), fullId.pageId()); + + try { + final long pageAddr = mem.writeLock(fullId.groupId(), fullId.pageId(), page); + + try { + pageIO.initNewPage(pageAddr, fullId.pageId(), mem.pageSize()); + } + finally { + mem.writeUnlock(fullId.groupId(), fullId.pageId(), page, null, true); + } + } + finally { + mem.releasePage(fullId.groupId(), fullId.pageId(), page); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e24d4d03/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsEvictionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsEvictionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsEvictionTest.java index 47a4b7b..1b86e3d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsEvictionTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsEvictionTest.java @@ -35,6 +35,7 @@ import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.pagemem.PageUtils; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.DummyPageIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; @@ -138,6 +139,8 @@ public class IgnitePdsEvictionTest extends GridCommonAbstractTest { IgniteCacheDatabaseSharedManager db = ignite.context().cache().context().database(); + PageIO pageIO = new DummyPageIO(); + // Allocate. for (int i = 0; i < size; i++) { db.checkpointReadLock(); @@ -145,6 +148,8 @@ public class IgnitePdsEvictionTest extends GridCommonAbstractTest { final FullPageId fullId = new FullPageId(memory.allocatePage(cacheId, i % 256, PageMemory.FLAG_DATA), cacheId); + initPage(memory, pageIO, fullId); + pageIds.add(fullId); } finally { @@ -180,6 +185,31 @@ public class IgnitePdsEvictionTest extends GridCommonAbstractTest { } /** + * Initializes page. + * @param mem page memory implementation. + * @param pageIO page io implementation. + * @param fullId full page id. + * @throws IgniteCheckedException if error occurs. + */ + private void initPage(PageMemory mem, PageIO pageIO, FullPageId fullId) throws IgniteCheckedException { + long page = mem.acquirePage(fullId.groupId(), fullId.pageId()); + + try { + final long pageAddr = mem.writeLock(fullId.groupId(), fullId.pageId(), page); + + try { + pageIO.initNewPage(pageAddr, fullId.pageId(), mem.pageSize()); + } + finally { + mem.writeUnlock(fullId.groupId(), fullId.pageId(), page, null, true); + } + } + finally { + mem.releasePage(fullId.groupId(), fullId.pageId(), page); + } + } + + /** * @param start Start index. * @param end End index. * @param memory PageMemory. http://git-wip-us.apache.org/repos/asf/ignite/blob/e24d4d03/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java index edfaecf..53d5de5 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/database/H2TreeIndex.java @@ -27,6 +27,7 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.internal.pagemem.PageIdUtils; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.RootPage; import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree; import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO; @@ -109,26 +110,35 @@ public class H2TreeIndex extends GridH2IndexBase { segments = new H2Tree[segmentsCnt]; + IgniteCacheDatabaseSharedManager db = cctx.shared().database(); + for (int i = 0; i < segments.length; i++) { - RootPage page = getMetaPage(name, i); - - segments[i] = new H2Tree( - name, - cctx.offheap().reuseListForIndex(name), - cctx.groupId(), - cctx.dataRegion().pageMemory(), - cctx.shared().wal(), - cctx.offheap().globalRemoveId(), - tbl.rowFactory(), - page.pageId().pageId(), - page.isAllocated(), - cols, - inlineIdxs, - computeInlineSize(inlineIdxs, inlineSize)) { - @Override public int compareValues(Value v1, Value v2) { - return v1 == v2 ? 0 : table.compareTypeSafe(v1, v2); - } - }; + db.checkpointReadLock(); + + try { + RootPage page = getMetaPage(name, i); + + segments[i] = new H2Tree( + name, + cctx.offheap().reuseListForIndex(name), + cctx.groupId(), + cctx.dataRegion().pageMemory(), + cctx.shared().wal(), + cctx.offheap().globalRemoveId(), + tbl.rowFactory(), + page.pageId().pageId(), + page.isAllocated(), + cols, + inlineIdxs, + computeInlineSize(inlineIdxs, inlineSize)) { + @Override public int compareValues(Value v1, Value v2) { + return v1 == v2 ? 0 : table.compareTypeSafe(v1, v2); + } + }; + } + finally { + db.checkpointReadUnlock(); + } } } else { @@ -202,6 +212,8 @@ public class H2TreeIndex extends GridH2IndexBase { H2Tree tree = treeForRead(seg); + assert cctx.shared().database().checkpointLockIsHeldByThread(); + return tree.put(row); } catch (IgniteCheckedException e) { @@ -221,6 +233,8 @@ public class H2TreeIndex extends GridH2IndexBase { H2Tree tree = treeForRead(seg); + assert cctx.shared().database().checkpointLockIsHeldByThread(); + return tree.putx(row); } catch (IgniteCheckedException e) { @@ -240,6 +254,8 @@ public class H2TreeIndex extends GridH2IndexBase { H2Tree tree = treeForRead(seg); + assert cctx.shared().database().checkpointLockIsHeldByThread(); + return tree.remove(row); } catch (IgniteCheckedException e) { @@ -259,6 +275,8 @@ public class H2TreeIndex extends GridH2IndexBase { H2Tree tree = treeForRead(seg); + assert cctx.shared().database().checkpointLockIsHeldByThread(); + return tree.removex(row); } catch (IgniteCheckedException e) { @@ -326,6 +344,8 @@ public class H2TreeIndex extends GridH2IndexBase { @Override public void destroy(boolean rmvIndex) { try { if (cctx.affinityNode() && rmvIndex) { + assert cctx.shared().database().checkpointLockIsHeldByThread(); + for (int i = 0; i < segments.length; i++) { H2Tree tree = segments[i];