IGNITE-6033 Added sorted and multithreaded modes in checkpointing algorithm - Fixes #2441.
Signed-off-by: Alexey Goncharuk <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/69e6f8b2 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/69e6f8b2 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/69e6f8b2 Branch: refs/heads/ignite-5578 Commit: 69e6f8b201a13f1c780948237960267c42ac5d2d Parents: b417a36 Author: Ivan Rakov <[email protected]> Authored: Thu Aug 17 15:54:21 2017 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Thu Aug 17 15:54:21 2017 +0300 ---------------------------------------------------------------------- .../configuration/CheckpointWriteOrder.java | 33 ++++++++ .../PersistentStoreConfiguration.java | 26 +++++++ .../GridCacheDatabaseSharedManager.java | 82 +++++++++++++++----- ...nitePersistenceSequentialCheckpointTest.java | 44 +++++++++++ .../IgnitePersistentStoreCacheGroupsTest.java | 31 ++++---- 5 files changed, 183 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/69e6f8b2/modules/core/src/main/java/org/apache/ignite/configuration/CheckpointWriteOrder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CheckpointWriteOrder.java b/modules/core/src/main/java/org/apache/ignite/configuration/CheckpointWriteOrder.java new file mode 100644 index 0000000..31feaf6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CheckpointWriteOrder.java @@ -0,0 +1,33 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.ignite.configuration; + +/** + * This enum defines order of writing pages to disk storage during checkpoint. + */ +public enum CheckpointWriteOrder { + /** + * Pages are written in order provided by checkpoint pages collection iterator (which is basically a hashtable). + */ + RANDOM, + + /** + * All checkpoint pages are collected into single list and sorted by page index. + * Provides almost sequential disk writes, which can be much faster on some SSD models. + */ + SEQUENTIAL +} http://git-wip-us.apache.org/repos/asf/ignite/blob/69e6f8b2/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java index e8a0ff4..5b902ac 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java @@ -47,6 +47,9 @@ public class PersistentStoreConfiguration implements Serializable { /** Default number of checkpointing threads. */ public static final int DFLT_CHECKPOINTING_THREADS = 1; + /** Default checkpoint write order. */ + public static final CheckpointWriteOrder DFLT_CHECKPOINT_WRITE_ORDER = CheckpointWriteOrder.RANDOM; + /** Default number of checkpoints to be kept in WAL after checkpoint is finished */ public static final int DFLT_WAL_HISTORY_SIZE = 20; @@ -95,6 +98,9 @@ public class PersistentStoreConfiguration implements Serializable { /** */ private int checkpointingThreads = DFLT_CHECKPOINTING_THREADS; + /** Checkpoint write order. */ + private CheckpointWriteOrder checkpointWriteOrder = DFLT_CHECKPOINT_WRITE_ORDER; + /** Number of checkpoints to keep */ private int walHistSize = DFLT_WAL_HISTORY_SIZE; @@ -587,6 +593,26 @@ public class PersistentStoreConfiguration implements Serializable { return walAutoArchiveAfterInactivity; } + /** + * This property defines order of writing pages to disk storage during checkpoint. + * + * @return Checkpoint write order. + */ + public CheckpointWriteOrder getCheckpointWriteOrder() { + return checkpointWriteOrder; + } + + /** + * This property defines order of writing pages to disk storage during checkpoint. + * + * @param checkpointWriteOrder Checkpoint write order. + */ + public PersistentStoreConfiguration setCheckpointWriteOrder(CheckpointWriteOrder checkpointWriteOrder) { + this.checkpointWriteOrder = checkpointWriteOrder; + + return this; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(PersistentStoreConfiguration.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/69e6f8b2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index 99e05dd..3c7ba28 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -49,7 +49,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -62,6 +61,7 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.PersistenceMetrics; +import org.apache.ignite.configuration.CheckpointWriteOrder; import org.apache.ignite.configuration.DataPageEvictionMode; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.MemoryConfiguration; @@ -93,7 +93,6 @@ import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.PageDeltaRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionDestroyRecord; import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor; import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor; @@ -135,6 +134,7 @@ import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.mxbean.PersistenceMetricsMXBean; import org.apache.ignite.thread.IgniteThread; +import org.apache.ignite.thread.IgniteThreadPoolExecutor; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -382,11 +382,12 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan long cpBufSize = persistenceCfg.getCheckpointingPageBufferSize(); if (persistenceCfg.getCheckpointingThreads() > 1) - asyncRunner = new ThreadPoolExecutor( + asyncRunner = new IgniteThreadPoolExecutor( + "checkpoint-runner", + cctx.igniteInstanceName(), persistenceCfg.getCheckpointingThreads(), persistenceCfg.getCheckpointingThreads(), - 30L, - TimeUnit.SECONDS, + 30_000, new LinkedBlockingQueue<Runnable>() ); @@ -2084,10 +2085,10 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan WALPointer cpPtr = null; - GridMultiCollectionWrapper<FullPageId> cpPages; - final CheckpointProgress curr; + IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>, Integer> cpPagesTuple; + tracker.onLockWaitStart(); checkpointLock.writeLock().lock(); @@ -2152,19 +2153,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan if (curr.nextSnapshot) snapshotMgr.onMarkCheckPointBegin(curr.snapshotOperation, map); - IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>, Integer> tup = beginAllCheckpoints(); + cpPagesTuple = beginAllCheckpoints(); - // Todo it maybe more optimally - Collection<FullPageId> cpPagesList = new ArrayList<>(tup.get2()); - - for (GridMultiCollectionWrapper<FullPageId> col : tup.get1()) { - for (int i = 0; i < col.collectionsSize(); i++) - cpPagesList.addAll(col.innerCollection(i)); - } - - cpPages = new GridMultiCollectionWrapper<>(cpPagesList); - - if (!F.isEmpty(cpPages)) { + if (!F.isEmpty(cpPagesTuple.get1())) { // No page updates for this checkpoint are allowed from now on. cpPtr = cctx.wal().log(cpRec); @@ -2180,7 +2171,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan curr.cpBeginFut.onDone(); - if (!F.isEmpty(cpPages)) { + if (!F.isEmpty(cpPagesTuple.get1())) { assert cpPtr != null; // Sync log outside the checkpoint write lock. @@ -2198,6 +2189,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan checkpointHist.addCheckpointEntry(cpEntry); + GridMultiCollectionWrapper<FullPageId> cpPages = splitAndSortCpPagesIfNeeded(cpPagesTuple); + if (printCheckpointStats) if (log.isInfoEnabled()) log.info(String.format("Checkpoint started [checkpointId=%s, startPtr=%s, checkpointLockWait=%dms, " + @@ -2295,6 +2288,55 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } } + /** + * Reorders list of checkpoint pages and splits them into needed number of sublists according to + * {@link PersistentStoreConfiguration#getCheckpointingThreads()} and + * {@link PersistentStoreConfiguration#getCheckpointWriteOrder()}. + * + * @param cpPagesTuple Checkpoint pages tuple. + */ + private GridMultiCollectionWrapper<FullPageId> splitAndSortCpPagesIfNeeded( + IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>, Integer> cpPagesTuple) { + List<FullPageId> cpPagesList = new ArrayList<>(cpPagesTuple.get2()); + + for (GridMultiCollectionWrapper<FullPageId> col : cpPagesTuple.get1()) { + for (int i = 0; i < col.collectionsSize(); i++) + cpPagesList.addAll(col.innerCollection(i)); + } + + if (persistenceCfg.getCheckpointWriteOrder() == CheckpointWriteOrder.SEQUENTIAL) { + Collections.sort(cpPagesList, new Comparator<FullPageId>() { + @Override public int compare(FullPageId o1, FullPageId o2) { + int cmp = Long.compare(o1.groupId(), o2.groupId()); + if (cmp != 0) + return cmp; + + return Long.compare(PageIdUtils.effectivePageId(o1.pageId()), + PageIdUtils.effectivePageId(o2.pageId())); + } + }); + } + + int cpThreads = persistenceCfg.getCheckpointingThreads(); + + int pagesSubLists = cpThreads == 1 ? 1 : cpThreads * 4; + // Splitting pages to (threads * 4) subtasks. If any thread will be faster, it will help slower threads. + + Collection[] pagesSubListArr = new Collection[pagesSubLists]; + + for (int i = 0; i < pagesSubLists; i++) { + int totalSize = cpPagesList.size(); + + int from = totalSize * i / (pagesSubLists); + + int to = totalSize * (i + 1) / (pagesSubLists); + + pagesSubListArr[i] = cpPagesList.subList(from, to); + } + + return new GridMultiCollectionWrapper<FullPageId>(pagesSubListArr); + } + /** Pages write task */ private class WriteCheckpointPages implements Runnable { /** */ http://git-wip-us.apache.org/repos/asf/ignite/blob/69e6f8b2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistenceSequentialCheckpointTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistenceSequentialCheckpointTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistenceSequentialCheckpointTest.java new file mode 100644 index 0000000..9295000 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistenceSequentialCheckpointTest.java @@ -0,0 +1,44 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.ignite.internal.processors.cache.persistence; + +import org.apache.ignite.configuration.CheckpointWriteOrder; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.PersistentStoreConfiguration; +import org.apache.ignite.configuration.WALMode; + +/** + * + */ +public class IgnitePersistenceSequentialCheckpointTest extends IgnitePersistentStoreCacheGroupsTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setPersistentStoreConfiguration(new PersistentStoreConfiguration() + .setWalMode(WALMode.LOG_ONLY) + .setCheckpointingThreads(4) + .setCheckpointWriteOrder(CheckpointWriteOrder.SEQUENTIAL)); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected int entriesCount() { + return 1000; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/69e6f8b2/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreCacheGroupsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreCacheGroupsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreCacheGroupsTest.java index a945c73..b39b8cb 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreCacheGroupsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePersistentStoreCacheGroupsTest.java @@ -87,7 +87,7 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest MemoryConfiguration memCfg = new MemoryConfiguration(); memCfg.setPageSize(1024); - memCfg.setDefaultMemoryPolicySize(10 * 1024 * 1024); + memCfg.setDefaultMemoryPolicySize(100 * 1024 * 1024); cfg.setMemoryConfiguration(memCfg); @@ -115,6 +115,11 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest super.afterTest(); } + /** Entries count. */ + protected int entriesCount() { + return 10; + } + /** * @throws Exception If failed. */ @@ -236,7 +241,7 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest for (String cacheName : caches) { IgniteCache<Object, Object> cache = node.cache(cacheName).withExpiryPolicy(plc); - for (int i = 0; i < 10; i++) + for (int i = 0; i < entriesCount(); i++) cache.put(i, cacheName + i); } @@ -253,10 +258,10 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest for (String cacheName : caches) { IgniteCache<Object, Object> cache = node.cache(cacheName); - for (int i = 0; i < 10; i++) + for (int i = 0; i < entriesCount(); i++) assertEquals(cacheName + i, cache.get(i)); - assertEquals(10, cache.size()); + assertEquals(entriesCount(), cache.size()); } // Wait for expiration. @@ -340,7 +345,7 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest for (String cacheName : caches) { IgniteCache<Object, Object> cache = node.cache(cacheName); - for (int i = 0; i < 10; i++) + for (int i = 0; i < entriesCount(); i++) cache.put(i, new Person("" + i, cacheName)); } } @@ -353,10 +358,10 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest for (String cacheName : caches) { IgniteCache<Object, Object> cache = node.cache(cacheName); - for (int i = 0; i < 10; i++) + for (int i = 0; i < entriesCount(); i++) assertEquals(new Person("" + i, cacheName), cache.get(i)); - assertEquals(10, cache.size()); + assertEquals(entriesCount(), cache.size()); } } @@ -373,10 +378,10 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest List<Cache.Entry<Integer, Person>> persons = cache.query(qry.setArgs(cacheName)).getAll(); - for (int i = 0; i < 10; i++) + for (int i = 0; i < entriesCount(); i++) assertEquals(new Person("" + i, cacheName), persons.get(i).getValue()); - assertEquals(10, persons.size()); + assertEquals(entriesCount(), persons.size()); } } @@ -413,13 +418,13 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest for (String cacheName : caches) { IgniteCache<Object, Object> cache = node.cache(cacheName); - for (int i = 0; i < 10; i++) { + for (int i = 0; i < entriesCount(); i++) { cache.put(i, cacheName + i); assertEquals(cacheName + i, cache.get(i)); } - assertEquals(10, cache.size()); + assertEquals(entriesCount(), cache.size()); } stopAllGrids(); @@ -433,10 +438,10 @@ public class IgnitePersistentStoreCacheGroupsTest extends GridCommonAbstractTest for (String cacheName : caches) { IgniteCache<Object, Object> cache = node.cache(cacheName); - for (int i = 0; i < 10; i++) + for (int i = 0; i < entriesCount(); i++) assertEquals(cacheName + i, cache.get(i)); - assertEquals(10, cache.size()); + assertEquals(entriesCount(), cache.size()); } }
