Repository: ignite
Updated Branches:
  refs/heads/ignite-10508-pagememory [created] ca09cebcf


IGNITE-10508 page memory updates

Signed-off-by: Dmitriy Govorukhin <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ca09cebc
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ca09cebc
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ca09cebc

Branch: refs/heads/ignite-10508-pagememory
Commit: ca09cebcf7ab7a655dbba7543b150868e2de13ab
Parents: ca8a80b
Author: Dmitriy Govorukhin <[email protected]>
Authored: Wed Dec 19 14:46:19 2018 +0300
Committer: Dmitriy Govorukhin <[email protected]>
Committed: Wed Dec 19 14:46:19 2018 +0300

----------------------------------------------------------------------
 .../persistence/pagemem/PageMemoryImpl.java     | 199 ++++++++++++++-----
 1 file changed, 147 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ca09cebc/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
index 5f16ce8..9de79cd 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
@@ -126,9 +126,12 @@ public class PageMemoryImpl implements PageMemoryEx {
     /** */
     public static final long PAGE_MARKER = 0x0000000000000001L;
 
-    /** Relative pointer chunk index mask. */
+    /** Relative pointer chunk segment index mask. */
     private static final long SEGMENT_INDEX_MASK = 0xFFFFFF0000000000L;
 
+    /** Relative pointer chunk checkpoint index mask. */
+    private static final long CHECKPOINT_INDEX_MASK = 0xFF00000000000000L;
+
     /** Full relative pointer mask. */
     private static final long RELATIVE_PTR_MASK = 0xFFFFFFFFFFFFFFL;
 
@@ -879,15 +882,10 @@ public class PageMemoryImpl implements PageMemoryEx {
         if (rmv)
             seg.loadedPages.remove(grpId, PageIdUtils.effectivePageId(pageId));
 
-        Collection<FullPageId> cpPages = seg.segCheckpointPages;
-
-        if (cpPages != null)
-            cpPages.remove(new FullPageId(pageId, grpId));
-
-        Collection<FullPageId> dirtyPages = seg.dirtyPages;
+        FullPageId fullPageId = new FullPageId(pageId, grpId);
 
-        if (dirtyPages != null)
-            dirtyPages.remove(new FullPageId(pageId, grpId));
+        seg.removeCheckpointPage(fullPageId);
+        seg.removeDirtyPage(fullPageId);
 
         return relPtr;
     }
@@ -1050,15 +1048,21 @@ public class PageMemoryImpl implements PageMemoryEx {
         if (segments == null)
             return new 
GridMultiCollectionWrapper<>(Collections.<FullPageId>emptyList());
 
-        Collection[] collections = new Collection[segments.length];
+        Collection[] collections = new Collection[segments.length * 2];
 
         for (int i = 0; i < segments.length; i++) {
             Segment seg = segments[i];
 
-            if (seg.segCheckpointPages != null)
-                throw new IgniteException("Failed to begin checkpoint (it is 
already in progress).");
+            // If curr != null - previous checkpoint was canceled.
+            if (seg.currChpPages != null) {
+                collections[(i * 2) + 1] = seg.prevChpPages = seg.currChpPages;
 
-            collections[i] = seg.segCheckpointPages = seg.dirtyPages;
+                seg.checkpointIdx++;
+            }
+            else
+                collections[(i * 2) + 1] = Collections.emptySet();
+
+            collections[(i * 2)] = seg.currChpPages = seg.dirtyPages;
 
             seg.dirtyPages = new GridConcurrentHashSet<>();
         }
@@ -1083,8 +1087,11 @@ public class PageMemoryImpl implements PageMemoryEx {
         if (segments == null)
             return;
 
-        for (Segment seg : segments)
-            seg.segCheckpointPages = null;
+        for (Segment seg : segments) {
+            seg.prevChpPages = null;
+            seg.currChpPages = null;
+            seg.checkpointIdx = 0;
+        }
 
         if (throttlingPlc != ThrottlingPolicy.DISABLED)
             writeThrottle.onFinishCheckpoint();
@@ -1212,7 +1219,11 @@ public class PageMemoryImpl implements PageMemoryEx {
         try {
             long tmpRelPtr = PageHeader.tempBufferPointer(absPtr);
 
-            boolean success = clearCheckpoint(fullId);
+            boolean pageCopyed = false;
+
+            byte checkpointIdx = (byte)((tmpRelPtr & CHECKPOINT_INDEX_MASK) >> 
56);
+
+            boolean success = removeCheckpointPage(fullId);
 
             assert success : "Page was pin when we resolve abs pointer, it can 
not be evicted";
 
@@ -1221,22 +1232,31 @@ public class PageMemoryImpl implements PageMemoryEx {
 
                 long tmpAbsPtr = checkpointPool.absolute(tmpRelPtr);
 
-                copyInBuffer(tmpAbsPtr, outBuf);
+                if (checkpointIdx == currentCheckpointIdx(fullId)) {
+                    copyInBuffer(tmpAbsPtr, outBuf);
 
-                GridUnsafe.setMemory(tmpAbsPtr + PAGE_OVERHEAD, pageSize(), 
(byte)0);
+                    GridUnsafe.setMemory(tmpAbsPtr + PAGE_OVERHEAD, 
pageSize(), (byte)0);
 
-                if (tracker != null)
-                    tracker.onCowPageWritten();
+                    if (tracker != null)
+                        tracker.onCowPageWritten();
 
-                checkpointPool.releaseFreePage(tmpRelPtr);
+                    checkpointPool.releaseFreePage(tmpRelPtr);
 
-                // Need release again because we pin page when resolve abs 
pointer,
-                // and page did not have tmp buffer page.
-                if (!pageSingleAcquire)
-                    PageHeader.releasePage(absPtr);
+                    // Need release again because we pin page when resolve abs 
pointer,
+                    // and page did not have tmp buffer page.
+                    if (!pageSingleAcquire)
+                        PageHeader.releasePage(absPtr);
+
+                    pageCopyed = true;
+                }
+                else {
+                    GridUnsafe.setMemory(tmpAbsPtr + PAGE_OVERHEAD, 
pageSize(), (byte)0);
 
+                    checkpointPool.releaseFreePage(tmpRelPtr);
+                }
             }
-            else {
+
+            if (!pageCopyed) {
                 copyInBuffer(absPtr, outBuf);
 
                 PageHeader.dirty(absPtr, false);
@@ -1470,6 +1490,10 @@ public class PageMemoryImpl implements PageMemoryEx {
         return locked ? postWriteLockPage(absPtr, fullId) : 0;
     }
 
+    private static long setCheckpointId(long tmpRelPtr, byte chpIdx) {
+        return (tmpRelPtr & ~CHECKPOINT_INDEX_MASK) & ((((int)chpIdx) << 56) | 
~CHECKPOINT_INDEX_MASK);
+    }
+
     /**
      * @param absPtr Absolute pointer.
      * @return Pointer to the page write buffer.
@@ -1477,18 +1501,45 @@ public class PageMemoryImpl implements PageMemoryEx {
     private long postWriteLockPage(long absPtr, FullPageId fullId) {
         PageHeader.writeTimestamp(absPtr, U.currentTimeMillis());
 
+        // Current checkpoint idx.
+        byte currChpIdx = currentCheckpointIdx(fullId);
+
         // Create a buffer copy if the page is scheduled for a checkpoint.
-        if (isInCheckpoint(fullId) && PageHeader.tempBufferPointer(absPtr) == 
INVALID_REL_PTR) {
-            long tmpRelPtr = 
checkpointPool.borrowOrAllocateFreePage(fullId.pageId());
+        if (isInCheckpoint(fullId)) {
+            long tmpBufferPointer = PageHeader.tempBufferPointer(absPtr);
+
+            long tmpRelPtr;
+
+            // If buffer was not created befor.
+            if (tmpBufferPointer == INVALID_REL_PTR) {
+                tmpRelPtr = 
checkpointPool.borrowOrAllocateFreePage(fullId.pageId());
 
-            if (tmpRelPtr == INVALID_REL_PTR) {
-                rwLock.writeUnlock(absPtr + PAGE_LOCK_OFFSET, 
OffheapReadWriteLock.TAG_LOCK_ALWAYS);
+                if (tmpRelPtr == INVALID_REL_PTR) {
+                    rwLock.writeUnlock(absPtr + PAGE_LOCK_OFFSET, 
OffheapReadWriteLock.TAG_LOCK_ALWAYS);
 
-                throw new IgniteException(CHECKPOINT_POOL_OVERFLOW_ERROR_MSG + 
": " + memMetrics.getName());
+                    throw new 
IgniteException(CHECKPOINT_POOL_OVERFLOW_ERROR_MSG + ": " + 
memMetrics.getName());
+                }
+
+                tmpRelPtr = setCheckpointId(tmpRelPtr, currChpIdx);
+
+                // Pin the page until checkpoint is not finished.
+                PageHeader.acquirePage(absPtr);
             }
+            else {
+                byte tmpBufferChpIdx = (byte)((tmpBufferPointer & 
CHECKPOINT_INDEX_MASK) >> 56);
 
-            // Pin the page until checkpoint is not finished.
-            PageHeader.acquirePage(absPtr);
+                // Check buffer checkpoint id and current checkpoint id.
+                // If they not equal we can rewrite page content with new page 
state.
+                if (currChpIdx != tmpBufferChpIdx) {
+                    tmpRelPtr = setCheckpointId(tmpBufferPointer, currChpIdx);
+                }
+                else {
+                    // We have buffer with alredy coppied page for this 
checkpoint.
+                    assert PageIO.getCrc(absPtr + PAGE_OVERHEAD) == 0; //TODO 
GG-11480
+
+                    return absPtr + PAGE_OVERHEAD;
+                }
+            }
 
             long tmpAbsPtr = checkpointPool.absolute(tmpRelPtr);
 
@@ -1581,29 +1632,33 @@ public class PageMemoryImpl implements PageMemoryEx {
     }
 
     /**
-     * @param pageId Page ID to check if it was added to the checkpoint list.
+     * @param fullPageId Page ID to check if it was added to the checkpoint 
list.
      * @return {@code True} if it was added to the checkpoint list.
      */
-    boolean isInCheckpoint(FullPageId pageId) {
-        Segment seg = segment(pageId.groupId(), pageId.pageId());
-
-        Collection<FullPageId> pages0 = seg.segCheckpointPages;
+    boolean isInCheckpoint(FullPageId fullPageId) {
+        Segment seg = segment(fullPageId.groupId(), fullPageId.pageId());
 
-        return pages0 != null && pages0.contains(pageId);
+        return seg.isInCheckpoint(fullPageId);
     }
 
     /**
-     * @param fullPageId Page ID to clear.
-     * @return {@code True} if remove successfully.
+     * @param fullPageId
+     * @return
      */
-    boolean clearCheckpoint(FullPageId fullPageId) {
+    byte currentCheckpointIdx(FullPageId fullPageId){
         Segment seg = segment(fullPageId.groupId(), fullPageId.pageId());
 
-        Collection<FullPageId> pages0 = seg.segCheckpointPages;
+        return seg.currentCheckpointIdx(fullPageId);
+    }
 
-        assert pages0 != null;
+    /**
+     * @param fullPageId Page ID to check if it was added to the checkpoint 
list.
+     * @return {@code True} if it was added to the checkpoint list.
+     */
+    boolean removeCheckpointPage(FullPageId fullPageId) {
+        Segment seg = segment(fullPageId.groupId(), fullPageId.pageId());
 
-        return pages0.remove(fullPageId);
+        return seg.removeCheckpointPage(fullPageId);
     }
 
     /**
@@ -1921,7 +1976,13 @@ public class PageMemoryImpl implements PageMemoryEx {
         private volatile Collection<FullPageId> dirtyPages = new 
GridConcurrentHashSet<>();
 
         /** */
-        private volatile Collection<FullPageId> segCheckpointPages;
+        private byte checkpointIdx;
+
+        /** */
+        private volatile Collection<FullPageId> prevChpPages;
+
+        /** */
+        private volatile Collection<FullPageId> currChpPages;
 
         /** */
         private final int maxDirtyPages;
@@ -1986,6 +2047,44 @@ public class PageMemoryImpl implements PageMemoryEx {
         /**
          *
          */
+        private byte currentCheckpointIdx(FullPageId fullPageId) {
+            return checkpointIdx;
+        }
+
+        /**
+         *
+         */
+        private boolean removeCheckpointPage(FullPageId fullPageId) {
+            Collection<FullPageId> prevChpPages0 = prevChpPages;
+            Collection<FullPageId> currChpPages0 = currChpPages;
+
+            return (prevChpPages0 != null && prevChpPages0.remove(fullPageId)) 
|
+                (currChpPages0 != null && currChpPages0.remove(fullPageId));
+        }
+
+        /**
+         *
+         */
+        private boolean removeDirtyPage(FullPageId fullPageId) {
+            Collection<FullPageId> dirtyPages0 = dirtyPages;
+
+            return dirtyPages0 != null && dirtyPages0.remove(fullPageId);
+        }
+
+        /**
+         *
+         */
+        private boolean isInCheckpoint(FullPageId fullPageId) {
+            Collection<FullPageId> prevChpPages0 = prevChpPages;
+            Collection<FullPageId> currChpPages0 = currChpPages;
+
+            return (prevChpPages0 != null && 
prevChpPages0.contains(fullPageId)) ||
+                (currChpPages0 != null && currChpPages0.contains(fullPageId));
+        }
+
+        /**
+         *
+         */
         private boolean safeToUpdate() {
             return dirtyPages.size() < maxDirtyPages;
         }
@@ -2070,14 +2169,12 @@ public class PageMemoryImpl implements PageMemoryEx {
             if (PageHeader.isAcquired(absPtr))
                 return false;
 
-            Collection<FullPageId> cpPages = segCheckpointPages;
-
             clearRowCache(fullPageId, absPtr);
 
             if (isDirty(absPtr)) {
                 // Can evict a dirty page only if should be written by a 
checkpoint.
                 // These pages does not have tmp buffer.
-                if (cpPages != null && cpPages.contains(fullPageId)) {
+                if (removeCheckpointPage(fullPageId)) {
                     assert storeMgr != null;
 
                     memMetrics.updatePageReplaceRate(U.currentTimeMillis() - 
PageHeader.readTimestamp(absPtr));
@@ -2093,8 +2190,6 @@ public class PageMemoryImpl implements PageMemoryEx {
 
                     setDirty(fullPageId, absPtr, false, true);
 
-                    cpPages.remove(fullPageId);
-
                     return true;
                 }
 
@@ -2380,7 +2475,7 @@ public class PageMemoryImpl implements PageMemoryEx {
                 ", loaded=" + loadedPages.size() +
                 ", maxDirtyPages=" + maxDirtyPages +
                 ", dirtyPages=" + dirtyPages.size() +
-                ", cpPages=" + (segCheckpointPages == null ? 0 : 
segCheckpointPages.size()) +
+                ", cpPages=" + (currChpPages == null ? 0 : 
currChpPages.size()) +
                 ", pinnedInSegment=" + pinnedCnt +
                 ", failedToPrepare=" + failToPrepare +
                 ']' + U.nl() + "Out of memory in data region [" +

Reply via email to