IGNITE-8955 Checkpoint can't get write lock if massive eviction on node start 
started

Signed-off-by: Ivan Rakov <ira...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a0fa79a8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a0fa79a8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a0fa79a8

Branch: refs/heads/ignite-8446
Commit: a0fa79a8d297654973ec8bb890485e8e8818594c
Parents: 6ad291d
Author: Eduard Shangareev <eshangar...@gridgain.com>
Authored: Wed Jul 11 19:43:19 2018 +0300
Committer: Ivan Rakov <ira...@apache.org>
Committed: Wed Jul 11 19:43:19 2018 +0300

----------------------------------------------------------------------
 .../IgniteAuthenticationProcessor.java          |   7 +-
 .../dht/preloader/GridDhtPartitionDemander.java | 107 +++---
 .../GridCacheDatabaseSharedManager.java         | 169 +++++----
 .../persistence/GridCacheOffheapManager.java    |   1 +
 .../persistence/pagemem/PageMemoryImpl.java     |  18 +-
 .../pagemem/PagesWriteSpeedBasedThrottle.java   |  16 +-
 .../persistence/pagemem/PagesWriteThrottle.java |  19 +-
 .../pagemem/PagesWriteThrottlePolicy.java       |   5 +
 .../db/CheckpointBufferDeadlockTest.java        | 358 +++++++++++++++++++
 9 files changed, 577 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/a0fa79a8/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java
index ac713c3..ded37e7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/authentication/IgniteAuthenticationProcessor.java
@@ -1292,9 +1292,10 @@ public class IgniteAuthenticationProcessor extends 
GridProcessorAdapter implemen
                 // Remove failed operation from active operations.
                 activeOps.remove(op.id());
             }
-
-            if (sharedCtx != null)
-                sharedCtx.database().checkpointReadUnlock();
+            finally {
+                if (sharedCtx != null)
+                    sharedCtx.database().checkpointReadUnlock();
+            }
 
             curOpFinishMsg = msg0;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a0fa79a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
index 3cfc25f..1eeebae 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionDemander.java
@@ -727,77 +727,86 @@ public class GridDhtPartitionDemander {
         try {
             AffinityAssignment aff = grp.affinity().cachedAffinity(topVer);
 
-            ctx.database().checkpointReadLock();
+            // Preload.
+            for (Map.Entry<Integer, CacheEntryInfoCollection> e : 
supply.infos().entrySet()) {
+                int p = e.getKey();
 
-            try {
-                // Preload.
-                for (Map.Entry<Integer, CacheEntryInfoCollection> e : 
supply.infos().entrySet()) {
-                    int p = e.getKey();
+                if (aff.get(p).contains(ctx.localNode())) {
+                    GridDhtLocalPartition part = top.localPartition(p, topVer, 
true);
 
-                    if (aff.get(p).contains(ctx.localNode())) {
-                        GridDhtLocalPartition part = top.localPartition(p, 
topVer, true);
+                    assert part != null;
 
-                        assert part != null;
+                    boolean last = supply.last().containsKey(p);
 
-                        boolean last = supply.last().containsKey(p);
+                    if (part.state() == MOVING) {
+                        boolean reserved = part.reserve();
 
-                        if (part.state() == MOVING) {
-                            boolean reserved = part.reserve();
+                        assert reserved : "Failed to reserve partition 
[igniteInstanceName=" +
+                            ctx.igniteInstanceName() + ", grp=" + 
grp.cacheOrGroupName() + ", part=" + part + ']';
 
-                            assert reserved : "Failed to reserve partition 
[igniteInstanceName=" +
-                                ctx.igniteInstanceName() + ", grp=" + 
grp.cacheOrGroupName() + ", part=" + part + ']';
+                        part.lock();
 
-                            part.lock();
+                        try {
+                            Iterator<GridCacheEntryInfo> infos = 
e.getValue().infos().iterator();
 
-                            try {
-                                // Loop through all received entries and try 
to preload them.
-                                for (GridCacheEntryInfo entry : 
e.getValue().infos()) {
-                                    if (!preloadEntry(node, p, entry, topVer)) 
{
-                                        if (log.isDebugEnabled())
-                                            log.debug("Got entries for invalid 
partition during " +
-                                                "preloading (will skip) [p=" + 
p + ", entry=" + entry + ']');
-
-                                        break;
-                                    }
+                            // Loop through all received entries and try to 
preload them.
+                            while (infos.hasNext()) {
+                                ctx.database().checkpointReadLock();
 
-                                    for (GridCacheContext cctx : grp.caches()) 
{
-                                        if (cctx.statisticsEnabled())
-                                            
cctx.cache().metrics0().onRebalanceKeyReceived();
-                                    }
-                                }
+                                try {
+                                    for (int i = 0; i < 100; i++) {
+                                        if (!infos.hasNext())
+                                            break;
+
+                                        GridCacheEntryInfo entry = 
infos.next();
 
-                                // If message was last for this partition,
-                                // then we take ownership.
-                                if (last) {
-                                    fut.partitionDone(nodeId, p, true);
+                                        if (!preloadEntry(node, p, entry, 
topVer)) {
+                                            if (log.isDebugEnabled())
+                                                log.debug("Got entries for 
invalid partition during " +
+                                                        "preloading (will 
skip) [p=" + p + ", entry=" + entry + ']');
 
-                                    if (log.isDebugEnabled())
-                                        log.debug("Finished rebalancing 
partition: " + part);
+                                            break;
+                                        }
+
+                                        for (GridCacheContext cctx : 
grp.caches()) {
+                                            if (cctx.statisticsEnabled())
+                                                
cctx.cache().metrics0().onRebalanceKeyReceived();
+                                        }
+                                    }
+                                }
+                                finally {
+                                    ctx.database().checkpointReadUnlock();
                                 }
                             }
-                            finally {
-                                part.unlock();
-                                part.release();
+
+                            // If message was last for this partition,
+                            // then we take ownership.
+                            if (last) {
+                                fut.partitionDone(nodeId, p, true);
+
+                                if (log.isDebugEnabled())
+                                    log.debug("Finished rebalancing partition: 
" + part);
                             }
                         }
-                        else {
-                            if (last)
-                                fut.partitionDone(nodeId, p, false);
-
-                            if (log.isDebugEnabled())
-                                log.debug("Skipping rebalancing partition 
(state is not MOVING): " + part);
+                        finally {
+                            part.unlock();
+                            part.release();
                         }
                     }
                     else {
-                        fut.partitionDone(nodeId, p, false);
+                        if (last)
+                            fut.partitionDone(nodeId, p, false);
 
                         if (log.isDebugEnabled())
-                            log.debug("Skipping rebalancing partition (it does 
not belong on current node): " + p);
+                            log.debug("Skipping rebalancing partition (state 
is not MOVING): " + part);
                     }
                 }
-            }
-            finally {
-                ctx.database().checkpointReadUnlock();
+                else {
+                    fut.partitionDone(nodeId, p, false);
+
+                    if (log.isDebugEnabled())
+                        log.debug("Skipping rebalancing partition (it does not 
belong on current node): " + p);
+                }
             }
 
             // Only request partitions based on latest topology version.

http://git-wip-us.apache.org/repos/asf/ignite/blob/a0fa79a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 6389942..2a0ba44 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -1478,7 +1478,7 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
                 throw new IgniteException(new NodeStoppingException("Failed to 
perform cache update: node is stopping."));
             }
 
-            if (safeToUpdatePageMemories() || 
checkpointLock.getReadHoldCount() > 1)
+            if (checkpointLock.getReadHoldCount() > 1 || 
safeToUpdatePageMemories())
                 break;
             else {
                 checkpointLock.readLock().unlock();
@@ -2542,6 +2542,9 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
 
                 Integer tag = pageMem.getForCheckpoint(fullId, tmpWriteBuf, 
null);
 
+                assert tag == null || tag != PageMemoryImpl.TRY_AGAIN_TAG :
+                        "Lock is held by other thread for page " + fullId;
+
                 if (tag != null) {
                     tmpWriteBuf.rewind();
 
@@ -3103,7 +3106,8 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
                                     chp.cpPages.innerCollection(i),
                                     updStores,
                                     doneWriteFut,
-                                    totalPagesToWriteCnt
+                                    totalPagesToWriteCnt,
+                                    asyncRunner
                                 );
 
                                 try {
@@ -3117,11 +3121,13 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
                         }
                         else {
                             // Single-threaded checkpoint.
-                            Runnable write = new WriteCheckpointPages(tracker,
+                            Runnable write = new WriteCheckpointPages(
+                                tracker,
                                 chp.cpPages,
                                 updStores,
                                 doneWriteFut,
-                                totalPagesToWriteCnt);
+                                totalPagesToWriteCnt,
+                                null);
 
                             write.run();
                         }
@@ -3752,117 +3758,158 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
     /** Pages write task */
     private class WriteCheckpointPages implements Runnable {
         /** */
-        private CheckpointMetricsTracker tracker;
+        private final CheckpointMetricsTracker tracker;
 
         /** Collection of page IDs to write under this task. Overall pages to 
write may be greater than this collection */
-        private Collection<FullPageId> writePageIds;
+        private final Collection<FullPageId> writePageIds;
 
         /** */
-        private ConcurrentLinkedHashMap<PageStore, LongAdder> updStores;
+        private final ConcurrentLinkedHashMap<PageStore, LongAdder> updStores;
 
         /** */
-        private CountDownFuture doneFut;
+        private final CountDownFuture doneFut;
 
         /** Total pages to write, counter may be greater than {@link 
#writePageIds} size */
         private final int totalPagesToWrite;
 
+        /** If any pages were skipped, new task with remaining pages will be 
submitted here. */
+        private final ExecutorService retryWriteExecutor;
+
         /**
-         * Creates task for write pages
-         *
-         * @param tracker
-         * @param writePageIds Collection of page IDs to write.
-         * @param updStores
-         * @param doneFut
-         * @param totalPagesToWrite total pages to be written under this 
checkpoint
+         * @param tracker Tracker.
+         * @param writePageIds Write page ids.
+         * @param updStores Upd stores.
+         * @param doneFut Done future.
+         * @param totalPagesToWrite Total pages to write.
+         * @param retryWriteExecutor Retry write executor.
          */
         private WriteCheckpointPages(
             final CheckpointMetricsTracker tracker,
             final Collection<FullPageId> writePageIds,
             final ConcurrentLinkedHashMap<PageStore, LongAdder> updStores,
             final CountDownFuture doneFut,
-            final int totalPagesToWrite) {
+            final int totalPagesToWrite,
+            final ExecutorService retryWriteExecutor
+        ) {
             this.tracker = tracker;
             this.writePageIds = writePageIds;
             this.updStores = updStores;
             this.doneFut = doneFut;
             this.totalPagesToWrite = totalPagesToWrite;
+            this.retryWriteExecutor = retryWriteExecutor;
         }
 
         /** {@inheritDoc} */
         @Override public void run() {
+            snapshotMgr.beforeCheckpointPageWritten();
+
+            Collection<FullPageId> writePageIds = this.writePageIds;
+
+            try {
+                List<FullPageId> pagesToRetry = writePages(writePageIds);
+
+                if (pagesToRetry.isEmpty())
+                    doneFut.onDone((Void)null);
+                else {
+                    if (retryWriteExecutor == null) {
+                        while (!pagesToRetry.isEmpty())
+                            pagesToRetry = writePages(pagesToRetry);
+
+                        doneFut.onDone((Void)null);
+                    }
+                    else {
+                        // Submit current retry pages to the end of the queue 
to avoid starvation.
+                        WriteCheckpointPages retryWritesTask = new 
WriteCheckpointPages(
+                            tracker, pagesToRetry, updStores, doneFut, 
totalPagesToWrite, retryWriteExecutor);
+
+                        retryWriteExecutor.submit(retryWritesTask);
+                    }
+                }
+            }
+            catch (Throwable e) {
+                doneFut.onDone(e);
+            }
+        }
+
+        /**
+         * @param writePageIds Collections of pages to write.
+         * @return pagesToRetry Pages which should be retried.
+         */
+        private List<FullPageId> writePages(Collection<FullPageId> 
writePageIds) throws IgniteCheckedException {
             ByteBuffer tmpWriteBuf = threadBuf.get();
 
             long writeAddr = GridUnsafe.bufferAddress(tmpWriteBuf);
 
-            snapshotMgr.beforeCheckpointPageWritten();
+            List<FullPageId> pagesToRetry = new ArrayList<>();
 
-            try {
-                for (FullPageId fullId : writePageIds) {
-                    if (checkpointer.shutdownNow)
-                        break;
-
-                    tmpWriteBuf.rewind();
+            for (FullPageId fullId : writePageIds) {
+                if (checkpointer.shutdownNow)
+                    break;
 
-                    snapshotMgr.beforePageWrite(fullId);
+                tmpWriteBuf.rewind();
 
-                    int grpId = fullId.groupId();
+                snapshotMgr.beforePageWrite(fullId);
 
-                    PageMemoryEx pageMem;
+                int grpId = fullId.groupId();
 
-                    if (grpId != MetaStorage.METASTORAGE_CACHE_ID) {
-                        CacheGroupContext grp = 
context().cache().cacheGroup(grpId);
+                PageMemoryEx pageMem;
 
-                        if (grp == null)
-                            continue;
+                if (grpId != MetaStorage.METASTORAGE_CACHE_ID) {
+                    CacheGroupContext grp = 
context().cache().cacheGroup(grpId);
 
-                        if (!grp.dataRegion().config().isPersistenceEnabled())
-                            continue;
+                    if (grp == null)
+                        continue;
 
-                        pageMem = (PageMemoryEx)grp.dataRegion().pageMemory();
-                    }
-                    else
-                        pageMem = (PageMemoryEx)metaStorage.pageMemory();
+                    if (!grp.dataRegion().config().isPersistenceEnabled())
+                        continue;
 
+                    pageMem = (PageMemoryEx)grp.dataRegion().pageMemory();
+                }
+                else
+                    pageMem = (PageMemoryEx)metaStorage.pageMemory();
 
-                    Integer tag = pageMem.getForCheckpoint(
-                        fullId, tmpWriteBuf, persStoreMetrics.metricsEnabled() 
? tracker : null);
 
-                    if (tag != null) {
-                        assert PageIO.getType(tmpWriteBuf) != 0 : "Invalid 
state. Type is 0! pageId = " + U.hexLong(fullId.pageId());
-                        assert PageIO.getVersion(tmpWriteBuf) != 0 : "Invalid 
state. Version is 0! pageId = " + U.hexLong(fullId.pageId());
+                Integer tag = pageMem.getForCheckpoint(
+                    fullId, tmpWriteBuf, persStoreMetrics.metricsEnabled() ? 
tracker : null);
 
-                        tmpWriteBuf.rewind();
+                if (tag != null) {
+                    if (tag == PageMemoryImpl.TRY_AGAIN_TAG) {
+                        pagesToRetry.add(fullId);
 
-                        if (persStoreMetrics.metricsEnabled()) {
-                            int pageType = PageIO.getType(tmpWriteBuf);
+                        continue;
+                    }
 
-                            if (PageIO.isDataPageType(pageType))
-                                tracker.onDataPageWritten();
-                        }
+                    assert PageIO.getType(tmpWriteBuf) != 0 : "Invalid state. 
Type is 0! pageId = " + U.hexLong(fullId.pageId());
+                    assert PageIO.getVersion(tmpWriteBuf) != 0 : "Invalid 
state. Version is 0! pageId = " + U.hexLong(fullId.pageId());
 
-                        if (!skipCrc) {
-                            PageIO.setCrc(writeAddr, 
PureJavaCrc32.calcCrc32(tmpWriteBuf, pageSize()));
+                    tmpWriteBuf.rewind();
 
-                            tmpWriteBuf.rewind();
-                        }
+                    if (persStoreMetrics.metricsEnabled()) {
+                        int pageType = PageIO.getType(tmpWriteBuf);
 
-                        int curWrittenPages = 
writtenPagesCntr.incrementAndGet();
+                        if (PageIO.isDataPageType(pageType))
+                            tracker.onDataPageWritten();
+                    }
 
-                        snapshotMgr.onPageWrite(fullId, tmpWriteBuf, 
curWrittenPages, totalPagesToWrite);
+                    if (!skipCrc) {
+                        PageIO.setCrc(writeAddr, 
PureJavaCrc32.calcCrc32(tmpWriteBuf, pageSize()));
 
                         tmpWriteBuf.rewind();
+                    }
 
-                        PageStore store = storeMgr.writeInternal(grpId, 
fullId.pageId(), tmpWriteBuf, tag, false);
+                    int curWrittenPages = writtenPagesCntr.incrementAndGet();
 
-                        updStores.computeIfAbsent(store, k -> new 
LongAdder()).increment();
-                    }
-                }
+                    snapshotMgr.onPageWrite(fullId, tmpWriteBuf, 
curWrittenPages, totalPagesToWrite);
 
-                doneFut.onDone((Void)null);
-            }
-            catch (Throwable e) {
-                doneFut.onDone(e);
+                    tmpWriteBuf.rewind();
+
+                    PageStore store = storeMgr.writeInternal(grpId, 
fullId.pageId(), tmpWriteBuf, tag, false);
+
+                    updStores.computeIfAbsent(store, k -> new 
LongAdder()).increment();
+                }
             }
+
+            return pagesToRetry;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a0fa79a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
index 9861ef9..ea775dc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheOffheapManager.java
@@ -1207,6 +1207,7 @@ public class GridCacheOffheapManager extends 
IgniteCacheOffheapManagerImpl imple
                 IgniteCacheDatabaseSharedManager dbMgr = ctx.database();
 
                 dbMgr.checkpointReadLock();
+
                 try {
                     Metas metas = getOrAllocatePartitionMetas();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a0fa79a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
index a518121..f8f3b57 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
@@ -177,6 +177,9 @@ public class PageMemoryImpl implements PageMemoryEx {
     /** Number of random pages that will be picked for eviction. */
     public static final int RANDOM_PAGES_EVICT_NUM = 5;
 
+    /** Try again tag. */
+    public static final int TRY_AGAIN_TAG = -1;
+
     /** Tracking io. */
     private static final TrackingPageIO trackingIO = 
TrackingPageIO.VERSIONS.latest();
 
@@ -377,9 +380,9 @@ public class PageMemoryImpl implements PageMemoryEx {
         if (throttlingPlc == ThrottlingPolicy.SPEED_BASED)
             writeThrottle = new PagesWriteSpeedBasedThrottle(this, 
cpProgressProvider, stateChecker, log);
         else if (throttlingPlc == ThrottlingPolicy.TARGET_RATIO_BASED)
-            writeThrottle = new PagesWriteThrottle(this, cpProgressProvider, 
stateChecker, false);
+            writeThrottle = new PagesWriteThrottle(this, cpProgressProvider, 
stateChecker, false, log);
         else if (throttlingPlc == ThrottlingPolicy.CHECKPOINT_BUFFER_ONLY)
-            writeThrottle = new PagesWriteThrottle(this, null, stateChecker, 
true);
+            writeThrottle = new PagesWriteThrottle(this, null, stateChecker, 
true, log);
     }
 
     /** {@inheritDoc} */
@@ -1118,7 +1121,7 @@ public class PageMemoryImpl implements PageMemoryEx {
             }
         }
         else
-            return copyPageForCheckpoint(absPtr, fullId, outBuf, 
pageSingleAcquire, tracker) ? tag : null;
+            return copyPageForCheckpoint(absPtr, fullId, outBuf, 
pageSingleAcquire, tracker) ? tag : TRY_AGAIN_TAG;
     }
 
     /**
@@ -1128,6 +1131,8 @@ public class PageMemoryImpl implements PageMemoryEx {
      * @param pageSingleAcquire Page is acquired only once. We don't pin the 
page second time (until page will not be
      * copied) in case checkpoint temporary buffer is used.
      * @param tracker Checkpoint statistics tracker.
+     *
+     * @return False if someone else holds lock on page.
      */
     private boolean copyPageForCheckpoint(
         long absPtr,
@@ -1139,7 +1144,10 @@ public class PageMemoryImpl implements PageMemoryEx {
         assert absPtr != 0;
         assert PageHeader.isAcquired(absPtr);
 
-        rwLock.writeLock(absPtr + PAGE_LOCK_OFFSET, 
OffheapReadWriteLock.TAG_LOCK_ALWAYS);
+        boolean locked = rwLock.tryWriteLock(absPtr + PAGE_LOCK_OFFSET, 
OffheapReadWriteLock.TAG_LOCK_ALWAYS);
+
+        if (!locked)
+            return false;
 
         try {
             long tmpRelPtr = PageHeader.tempBufferPointer(absPtr);
@@ -2346,6 +2354,8 @@ public class PageMemoryImpl implements PageMemoryEx {
 
             Integer tag = partGenerationMap.get(new GroupPartitionId(grpId, 
partId));
 
+            assert tag == null || tag >= 0 : "Negative tag=" + tag;
+
             return tag == null ? INIT_PART_GENERATION : tag;
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a0fa79a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java
index 68fa529..2dd8127 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteSpeedBasedThrottle.java
@@ -24,6 +24,7 @@ import org.apache.ignite.IgniteLogger;
 import 
org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker;
 import 
org.apache.ignite.internal.processors.cache.persistence.CheckpointWriteProgressSupplier;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.typedef.internal.U;
 
 /**
  * Throttles threads that generate dirty pages during ongoing checkpoint.
@@ -113,10 +114,12 @@ public class PagesWriteSpeedBasedThrottle implements 
PagesWriteThrottlePolicy {
      * @param stateChecker Checkpoint lock state provider.
      * @param log Logger.
      */
-    public PagesWriteSpeedBasedThrottle(PageMemoryImpl pageMemory,
-        CheckpointWriteProgressSupplier cpProgress,
-        CheckpointLockStateChecker stateChecker,
-        IgniteLogger log) {
+    public PagesWriteSpeedBasedThrottle(
+            PageMemoryImpl pageMemory,
+            CheckpointWriteProgressSupplier cpProgress,
+            CheckpointLockStateChecker stateChecker,
+            IgniteLogger log
+    ) {
         this.pageMemory = pageMemory;
         this.cpProgress = cpProgress;
         totalPages = pageMemory.totalPages();
@@ -229,6 +232,11 @@ public class PagesWriteSpeedBasedThrottle implements 
PagesWriteThrottlePolicy {
      * @param throttleParkTimeNs the maximum number of nanoseconds to wait
      */
     protected void doPark(long throttleParkTimeNs) {
+        if (throttleParkTimeNs > LOGGING_THRESHOLD) {
+            U.warn(log, "Parking thread=" + Thread.currentThread().getName()
+                + " for timeout(ms)=" + (throttleParkTimeNs / 1_000_000));
+        }
+
         LockSupport.parkNanos(throttleParkTimeNs);
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/a0fa79a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
index 166cdcd..d5f4bd5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottle.java
@@ -18,8 +18,10 @@ package 
org.apache.ignite.internal.processors.cache.persistence.pagemem;
 
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.LockSupport;
+import org.apache.ignite.IgniteLogger;
 import 
org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker;
 import 
org.apache.ignite.internal.processors.cache.persistence.CheckpointWriteProgressSupplier;
+import org.apache.ignite.internal.util.typedef.internal.U;
 
 /**
  * Throttles threads that generate dirty pages during ongoing checkpoint.
@@ -50,21 +52,27 @@ public class PagesWriteThrottle implements 
PagesWriteThrottlePolicy {
     /** Counter for checkpoint buffer usage ratio throttling (we need a 
separate one due to IGNITE-7751). */
     private final AtomicInteger inCheckpointBackoffCntr = new AtomicInteger(0);
 
+    /** Logger. */
+    private IgniteLogger log;
+
     /**
      * @param pageMemory Page memory.
      * @param cpProgress Database manager.
      * @param stateChecker checkpoint lock state checker.
      * @param throttleOnlyPagesInCheckpoint If true, throttle will only 
protect from checkpoint buffer overflow.
+     * @param log Logger.
      */
     public PagesWriteThrottle(PageMemoryImpl pageMemory,
         CheckpointWriteProgressSupplier cpProgress,
         CheckpointLockStateChecker stateChecker,
-        boolean throttleOnlyPagesInCheckpoint
+        boolean throttleOnlyPagesInCheckpoint,
+        IgniteLogger log
     ) {
         this.pageMemory = pageMemory;
         this.cpProgress = cpProgress;
         this.stateChecker = stateChecker;
         this.throttleOnlyPagesInCheckpoint = throttleOnlyPagesInCheckpoint;
+        this.log = log;
 
         if (!throttleOnlyPagesInCheckpoint)
             assert cpProgress != null : "cpProgress must be not null if ratio 
based throttling mode is used";
@@ -111,7 +119,14 @@ public class PagesWriteThrottle implements 
PagesWriteThrottlePolicy {
         if (shouldThrottle) {
             int throttleLevel = cntr.getAndIncrement();
 
-            LockSupport.parkNanos((long)(STARTING_THROTTLE_NANOS * 
Math.pow(BACKOFF_RATIO, throttleLevel)));
+            long throttleParkTimeNs = (long) (STARTING_THROTTLE_NANOS * 
Math.pow(BACKOFF_RATIO, throttleLevel));
+
+            if (throttleParkTimeNs > LOGGING_THRESHOLD) {
+                U.warn(log, "Parking thread=" + 
Thread.currentThread().getName()
+                    + " for timeout(ms)=" + (throttleParkTimeNs / 1_000_000));
+            }
+
+            LockSupport.parkNanos(throttleParkTimeNs);
         }
         else
             cntr.set(0);

http://git-wip-us.apache.org/repos/asf/ignite/blob/a0fa79a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java
index adeaa3d..53a8017 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PagesWriteThrottlePolicy.java
@@ -17,10 +17,15 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.pagemem;
 
+import java.util.concurrent.TimeUnit;
+
 /**
  * Throttling policy, encapsulates logic of delaying write operations.
  */
 public interface PagesWriteThrottlePolicy {
+    /** Max park time. */
+    public long LOGGING_THRESHOLD = TimeUnit.SECONDS.toNanos(10);
+
     /**
      * Callback to apply throttling delay.
      * @param isPageInCheckpoint flag indicating if current page is in scope 
of current checkpoint.

http://git-wip-us.apache.org/repos/asf/ignite/blob/a0fa79a8/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/CheckpointBufferDeadlockTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/CheckpointBufferDeadlockTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/CheckpointBufferDeadlockTest.java
new file mode 100644
index 0000000..2b5a65d
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/CheckpointBufferDeadlockTest.java
@@ -0,0 +1,358 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.internal.processors.cache.persistence.db;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.file.OpenOption;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.LockSupport;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.FullPageId;
+import org.apache.ignite.internal.pagemem.PageIdAllocator;
+import org.apache.ignite.internal.pagemem.PageIdUtils;
+import org.apache.ignite.internal.pagemem.store.PageStore;
+import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
+import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static java.nio.file.StandardOpenOption.CREATE;
+import static java.nio.file.StandardOpenOption.READ;
+import static java.nio.file.StandardOpenOption.WRITE;
+
+/**
+ *
+ */
+public class CheckpointBufferDeadlockTest extends GridCommonAbstractTest {
+    /** Ip finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** Max size. */
+    private static final int MAX_SIZE = 500 * 1024 * 1024;
+
+    /** CP buffer size. */
+    private static final int CP_BUF_SIZE = 20 * 1024 * 1024;
+
+    /** Slow checkpoint enabled. */
+    private static final AtomicBoolean slowCheckpointEnabled = new 
AtomicBoolean(false);
+
+    /** Checkpoint park nanos. */
+    private static final int CHECKPOINT_PARK_NANOS = 50_000_000;
+
+    /** Entry byte chunk size. */
+    private static final int ENTRY_BYTE_CHUNK_SIZE = 900;
+
+    /** Pages touched under CP lock. */
+    private static final int PAGES_TOUCHED_UNDER_CP_LOCK = 20;
+
+    /** Slop load flag. */
+    private static final AtomicBoolean stop = new AtomicBoolean(false);
+
+    /** Checkpoint threads. */
+    private int checkpointThreads;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
+
+        cfg.setDataStorageConfiguration(
+            new DataStorageConfiguration()
+                .setFileIOFactory(new SlowCheckpointFileIOFactory())
+                .setCheckpointThreads(checkpointThreads)
+                .setDefaultDataRegionConfiguration(
+                    new DataRegionConfiguration()
+                        .setPersistenceEnabled(true)
+                        .setMaxSize(MAX_SIZE)
+                        .setCheckpointPageBufferSize(CP_BUF_SIZE)
+                )
+        );
+
+        cfg.setFailureHandler(new StopNodeFailureHandler());
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        stop.set(false);
+
+        slowCheckpointEnabled.set(false);
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stop.set(true);
+
+        slowCheckpointEnabled.set(false);
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /**
+     *
+     */
+    public void testFourCheckpointThreads() throws Exception {
+        checkpointThreads = 4;
+
+        runDeadlockScenario();
+    }
+
+    /**
+     *
+     */
+    public void testOneCheckpointThread() throws Exception {
+        checkpointThreads = 1;
+
+        runDeadlockScenario();
+    }
+
+    /**
+     *
+     */
+    private void runDeadlockScenario() throws Exception {
+        IgniteEx ig = startGrid(0);
+
+        ig.cluster().active(true);
+
+        GridCacheDatabaseSharedManager db = 
(GridCacheDatabaseSharedManager)ig.context().cache().context().database();
+
+        FilePageStoreManager pageStoreMgr = 
(FilePageStoreManager)ig.context().cache().context().pageStore();
+
+        IgniteCache<Object, Object> singlePartCache = ig.getOrCreateCache(new 
CacheConfiguration<>()
+            .setName("single-part")
+            .setAffinity(new RendezvousAffinityFunction(false, 1)));
+
+        db.enableCheckpoints(false).get();
+
+        Thread.sleep(1_000);
+
+        try (IgniteDataStreamer<Object, Object> streamer = 
ig.dataStreamer(singlePartCache.getName())) {
+            int entries = MAX_SIZE / ENTRY_BYTE_CHUNK_SIZE / 4;
+
+            for (int i = 0; i < entries; i++)
+                streamer.addData(i, new byte[ENTRY_BYTE_CHUNK_SIZE]);
+
+            streamer.flush();
+        }
+
+        slowCheckpointEnabled.set(true);
+        log.info(">>> Slow checkpoints enabled");
+
+        db.enableCheckpoints(true).get();
+
+        AtomicBoolean fail = new AtomicBoolean(false);
+
+        GridTestUtils.runMultiThreadedAsync(new Runnable() {
+            @Override public void run() {
+                int loops = 0;
+
+                while (!stop.get()) {
+                    if (loops % 10 == 0 && loops > 0 && loops < 500 || loops % 
500 == 0 && loops >= 500)
+                        log.info("Successfully completed " + loops + " loops");
+
+                    db.checkpointReadLock();
+
+                    try {
+                        Set<FullPageId> pickedPagesSet = new HashSet<>();
+
+                        PageStore store = 
pageStoreMgr.getStore(CU.cacheId("single-part"), 0);
+
+                        int pages = store.pages();
+
+                        DataRegion region = 
db.dataRegion(DataStorageConfiguration.DFLT_DATA_REG_DEFAULT_NAME);
+
+                        PageMemoryImpl pageMem = 
(PageMemoryImpl)region.pageMemory();
+
+                        while (pickedPagesSet.size() < 
PAGES_TOUCHED_UNDER_CP_LOCK) {
+                            int pageIdx = ThreadLocalRandom.current().nextInt(
+                                PAGES_TOUCHED_UNDER_CP_LOCK, pages - 
PAGES_TOUCHED_UNDER_CP_LOCK);
+
+                            long pageId = PageIdUtils.pageId(0, 
PageIdAllocator.FLAG_DATA, pageIdx);
+
+                            pickedPagesSet.add(new FullPageId(pageId, 
CU.cacheId("single-part")));
+                        }
+
+                        List<FullPageId> pickedPages = new 
ArrayList<>(pickedPagesSet);
+
+                        assertEquals(PAGES_TOUCHED_UNDER_CP_LOCK, 
pickedPages.size());
+
+                        // Sort to avoid deadlocks on pages rw-locks.
+                        pickedPages.sort(new Comparator<FullPageId>() {
+                            @Override public int compare(FullPageId o1, 
FullPageId o2) {
+                                int cmp = Long.compare(o1.groupId(), 
o2.groupId());
+                                if (cmp != 0)
+                                    return cmp;
+
+                                return 
Long.compare(PageIdUtils.effectivePageId(o1.pageId()),
+                                    PageIdUtils.effectivePageId(o2.pageId()));
+                            }
+                        });
+
+                        List<Long> readLockedPages = new ArrayList<>();
+
+                        // Read lock many pages at once intentionally.
+                        for (int i = 0; i < PAGES_TOUCHED_UNDER_CP_LOCK / 2; 
i++) {
+                            FullPageId fpid = pickedPages.get(i);
+
+                            long page = pageMem.acquirePage(fpid.groupId(), 
fpid.pageId());
+
+                            long abs = pageMem.readLock(fpid.groupId(), 
fpid.pageId(), page);
+
+                            assertFalse(fpid.toString(), abs == 0);
+
+                            readLockedPages.add(page);
+                        }
+
+                        // Emulate writes to trigger throttling.
+                        for (int i = PAGES_TOUCHED_UNDER_CP_LOCK / 2; i < 
PAGES_TOUCHED_UNDER_CP_LOCK; i++) {
+                            FullPageId fpid = pickedPages.get(i);
+
+                            long page = pageMem.acquirePage(fpid.groupId(), 
fpid.pageId());
+
+                            long abs = pageMem.writeLock(fpid.groupId(), 
fpid.pageId(), page);
+
+                            assertFalse(fpid.toString(), abs == 0);
+
+                            pageMem.writeUnlock(fpid.groupId(), fpid.pageId(), 
page, null, true);
+
+                            pageMem.releasePage(fpid.groupId(), fpid.pageId(), 
page);
+                        }
+
+                        for (int i = 0; i < PAGES_TOUCHED_UNDER_CP_LOCK / 2; 
i++) {
+                            FullPageId fpid = pickedPages.get(i);
+
+                            pageMem.readUnlock(fpid.groupId(), fpid.pageId(), 
readLockedPages.get(i));
+
+                            pageMem.releasePage(fpid.groupId(), fpid.pageId(), 
readLockedPages.get(i));
+                        }
+                    }
+                    catch (Throwable e) {
+                        log.error("Error in loader thread", e);
+
+                        fail.set(true);
+                    }
+                    finally {
+                        db.checkpointReadUnlock();
+                    }
+
+                    loops++;
+                }
+
+            }
+        }, 10, "load-runner");
+
+        Thread.sleep(10_000); // Await for the start of throttling.
+
+        slowCheckpointEnabled.set(false);
+        log.info(">>> Slow checkpoints disabled");
+
+        assertFalse(fail.get());
+
+        forceCheckpoint(); // Previous checkpoint should eventually finish.
+    }
+
+    /**
+     * Create File I/O that emulates poor checkpoint write speed.
+     */
+    private static class SlowCheckpointFileIOFactory implements FileIOFactory {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+
+        /** Delegate factory. */
+        private final FileIOFactory delegateFactory = new 
RandomAccessFileIOFactory();
+
+        /** {@inheritDoc} */
+        @Override public FileIO create(File file) throws IOException {
+            return create(file, CREATE, READ, WRITE);
+        }
+
+        /** {@inheritDoc} */
+        @Override public FileIO create(File file, OpenOption... openOption) 
throws IOException {
+            final FileIO delegate = delegateFactory.create(file, openOption);
+
+            return new FileIODecorator(delegate) {
+                @Override public int write(ByteBuffer srcBuf) throws 
IOException {
+                    parkIfNeeded();
+
+                    return delegate.write(srcBuf);
+                }
+
+                @Override public int write(ByteBuffer srcBuf, long position) 
throws IOException {
+                    parkIfNeeded();
+
+                    return delegate.write(srcBuf, position);
+                }
+
+                @Override public int write(byte[] buf, int off, int len) 
throws IOException {
+                    parkIfNeeded();
+
+                    return delegate.write(buf, off, len);
+                }
+
+                /**
+                 * Parks current checkpoint thread if slow mode is enabled.
+                 */
+                private void parkIfNeeded() {
+                    if (slowCheckpointEnabled.get() && 
Thread.currentThread().getName().contains("checkpoint"))
+                        LockSupport.parkNanos(CHECKPOINT_PARK_NANOS);
+                }
+
+                /** {@inheritDoc} */
+                @Override public MappedByteBuffer map(int sizeBytes) throws 
IOException {
+                    return delegate.map(sizeBytes);
+                }
+            };
+        }
+    }
+
+
+}

Reply via email to