This is an automated email from the ASF dual-hosted git repository. sergeychugunov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 493759a IGNITE-13709 Control.sh API - status command for defragmentation feature - Fixes #8548. 493759a is described below commit 493759a5aa43d2a026b56552cf26cd72e0ed96a9 Author: ibessonov <bessonov...@gmail.com> AuthorDate: Wed Dec 9 09:54:58 2020 +0300 IGNITE-13709 Control.sh API - status command for defragmentation feature - Fixes #8548. Signed-off-by: Sergey Chugunov <sergey.chugu...@gmail.com> --- .../commandline/DefragmentationCommand.java | 13 +- .../GridCommandHandlerDefragmentationTest.java | 141 ++++- .../CachePartitionDefragmentationManager.java | 577 +++++++++++++++------ 3 files changed, 564 insertions(+), 167 deletions(-) diff --git a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/DefragmentationCommand.java b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/DefragmentationCommand.java index c2fa8e9..ec5c1f0 100644 --- a/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/DefragmentationCommand.java +++ b/modules/control-utility/src/main/java/org/apache/ignite/internal/commandline/DefragmentationCommand.java @@ -111,7 +111,7 @@ public class DefragmentationCommand implements Command<DefragmentationArguments> @Override public void parseArguments(CommandArgIterator argIter) { DefragmentationSubcommands cmd = DefragmentationSubcommands.of(argIter.nextArg("Expected defragmentation subcommand.")); - if (cmd == null || cmd == DefragmentationSubcommands.STATUS) // Status subcommand is not yet completed. + if (cmd == null) throw new IllegalArgumentException("Expected correct defragmentation subcommand."); args = new DefragmentationArguments(cmd); @@ -124,10 +124,17 @@ public class DefragmentationCommand implements Command<DefragmentationArguments> String subarg; do { - subarg = argIter.nextArg("Expected one of subcommand arguments.").toLowerCase(Locale.ENGLISH); + subarg = argIter.peekNextArg(); + + if (subarg == null) + break; + + subarg = subarg.toLowerCase(Locale.ENGLISH); switch (subarg) { case NODES_ARG: { + argIter.nextArg(""); + Set<String> ids = argIter.nextStringSet(NODES_ARG); if (ids.isEmpty()) @@ -139,6 +146,8 @@ public class DefragmentationCommand implements Command<DefragmentationArguments> } case CACHES_ARG: { + argIter.nextArg(""); + Set<String> ids = argIter.nextStringSet(CACHES_ARG); if (ids.isEmpty()) diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerDefragmentationTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerDefragmentationTest.java index adce9e2..c2ea3c2 100644 --- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerDefragmentationTest.java +++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerDefragmentationTest.java @@ -17,13 +17,16 @@ package org.apache.ignite.util; +import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.function.UnaryOperator; import java.util.logging.Formatter; import java.util.logging.LogRecord; import java.util.logging.Logger; import java.util.logging.StreamHandler; +import java.util.regex.Pattern; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.cluster.ClusterState; @@ -33,6 +36,7 @@ import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.commandline.CommandHandler; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.defragmentation.maintenance.DefragmentationParameters; import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; import org.apache.ignite.maintenance.MaintenanceTask; @@ -50,6 +54,9 @@ public class GridCommandHandlerDefragmentationTest extends GridCommandHandlerClu /** */ private static CountDownLatch blockCdl; + /** */ + private static CountDownLatch waitCdl; + /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { super.beforeTest(); @@ -205,7 +212,9 @@ public class GridCommandHandlerDefragmentationTest extends GridCommandHandlerClu assertTrue(logLsnr.check()); } - /** */ + /** + * @throws Exception If failed. + */ @Test public void testDefragmentationCancelInProgress() throws Exception { IgniteEx ig = startGrid(0); @@ -311,6 +320,136 @@ public class GridCommandHandlerDefragmentationTest extends GridCommandHandlerClu assertTrue(logLsnr.check()); } + /** + * @throws Exception If failed. + */ + @Test + public void testDefragmentationStatus() throws Exception { + IgniteEx ig = startGrid(0); + + ig.cluster().state(ClusterState.ACTIVE); + + ig.getOrCreateCache(DEFAULT_CACHE_NAME + "1"); + + IgniteCache<Object, Object> cache = ig.getOrCreateCache(DEFAULT_CACHE_NAME + "2"); + + ig.getOrCreateCache(DEFAULT_CACHE_NAME + "3"); + + for (int i = 0; i < 1024; i++) + cache.put(i, i); + + forceCheckpoint(ig); + + String grid0ConsId = ig.configuration().getConsistentId().toString(); + + ListeningTestLogger testLog = new ListeningTestLogger(); + + CommandHandler cmd = createCommandHandler(testLog); + + assertEquals(EXIT_CODE_OK, execute( + cmd, + "--defragmentation", + "schedule", + "--nodes", + grid0ConsId + )); + + String port = grid(0).localNode().attribute(IgniteNodeAttributes.ATTR_REST_TCP_PORT).toString(); + + stopGrid(0); + + blockCdl = new CountDownLatch(128); + waitCdl = new CountDownLatch(1); + + UnaryOperator<IgniteConfiguration> cfgOp = cfg -> { + DataStorageConfiguration dsCfg = cfg.getDataStorageConfiguration(); + + FileIOFactory delegate = dsCfg.getFileIOFactory(); + + dsCfg.setFileIOFactory((file, modes) -> { + if (file.getName().contains("dfrg")) { + if (blockCdl.getCount() == 0) { + try { + waitCdl.await(); + } + catch (InterruptedException ignore) { + // No-op. + } + } + else + blockCdl.countDown(); + } + + return delegate.create(file, modes); + }); + + return cfg; + }; + + IgniteInternalFuture<?> fut = GridTestUtils.runAsync(() -> { + try { + startGrid(0, cfgOp); + } + catch (Exception e) { + // No-op. + throw new RuntimeException(e); + } + }); + + blockCdl.await(); + + List<LogListener> logLsnrs = Arrays.asList( + LogListener.matches("default1 - size before/after: 0MB/0MB").build(), + LogListener.matches("default2 - partitions processed/all:").build(), + LogListener.matches("Awaiting defragmentation: default3").build() + ); + + for (LogListener logLsnr : logLsnrs) + testLog.registerListener(logLsnr); + + assertEquals(EXIT_CODE_OK, execute( + cmd, + "--port", + port, + "--defragmentation", + "status" + )); + + waitCdl.countDown(); + + for (LogListener logLsnr : logLsnrs) + assertTrue(logLsnr.check()); + + fut.get(); + + ((GridCacheDatabaseSharedManager)grid(0).context().cache().context().database()) + .defragmentationManager() + .completionFuture() + .get(); + + testLog.clearListeners(); + + logLsnrs = Arrays.asList( + LogListener.matches("default1 - size before/after: 0MB/0MB").build(), + LogListener.matches(Pattern.compile("default2 - size before/after: (\\S+)/\\1")).build(), + LogListener.matches("default3 - size before/after: 0MB/0MB").build() + ); + + for (LogListener logLsnr : logLsnrs) + testLog.registerListener(logLsnr); + + assertEquals(EXIT_CODE_OK, execute( + cmd, + "--port", + port, + "--defragmentation", + "status" + )); + + for (LogListener logLsnr : logLsnrs) + assertTrue(logLsnr.check()); + } + /** */ private CommandHandler createCommandHandler(ListeningTestLogger testLog) { Logger log = CommandHandler.initLogger(null); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java index 41999fb..75b3458 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java @@ -19,11 +19,16 @@ package org.apache.ignite.internal.processors.cache.persistence.defragmentation; import java.io.File; import java.nio.file.Path; +import java.text.DecimalFormat; +import java.text.DecimalFormatSymbols; import java.util.Arrays; import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -66,6 +71,7 @@ import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree; import org.apache.ignite.internal.processors.cache.tree.PendingRow; import org.apache.ignite.internal.processors.query.GridQueryIndexing; import org.apache.ignite.internal.processors.query.GridQueryProcessor; +import org.apache.ignite.internal.util.GridAtomicLong; import org.apache.ignite.internal.util.collection.IntHashMap; import org.apache.ignite.internal.util.collection.IntMap; import org.apache.ignite.internal.util.future.GridCompoundFuture; @@ -78,9 +84,11 @@ import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.maintenance.MaintenanceRegistry; +import static java.util.Comparator.comparing; import static java.util.stream.StreamSupport.stream; import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_DATA; import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX; +import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION; import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.FINISHED; import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.DEFRAGMENTATION_MAPPING_REGION_NAME; import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.DEFRAGMENTATION_PART_REGION_NAME; @@ -105,6 +113,9 @@ public class CachePartitionDefragmentationManager { /** */ private final Set<String> cachesForDefragmentation; + /** */ + private final Set<CacheGroupContext> cacheGrpCtxsForDefragmentation = new TreeSet<>(comparing(CacheGroupContext::cacheOrGroupName)); + /** Cache shared context. */ private final GridCacheSharedContext<?, ?> sharedCtx; @@ -142,6 +153,9 @@ public class CachePartitionDefragmentationManager { private final AtomicBoolean cancel = new AtomicBoolean(); /** */ + private final DefragmentationStatus status = new DefragmentationStatus(); + + /** */ private final GridFutureAdapter<?> completionFut = new GridFutureAdapter<>(); /** @@ -190,11 +204,23 @@ public class CachePartitionDefragmentationManager { dbMgr.preserveWalTailPointer(); sharedCtx.wal().onDeActivate(sharedCtx.kernalContext()); + + for (CacheGroupContext oldGrpCtx : sharedCtx.cache().cacheGroups()) { + if (!oldGrpCtx.userCache() || cacheGrpCtxsForDefragmentation.contains(oldGrpCtx)) + continue; + + if (!cachesForDefragmentation.isEmpty()) { + if (oldGrpCtx.caches().stream().noneMatch(cctx -> cachesForDefragmentation.contains(cctx.name()))) + continue; + } + + cacheGrpCtxsForDefragmentation.add(oldGrpCtx); + } } /** */ public void executeDefragmentation() throws IgniteCheckedException { - log.info("Defragmentation started."); + status.onStart(cacheGrpCtxsForDefragmentation); try { // Now the actual process starts. @@ -203,23 +229,18 @@ public class CachePartitionDefragmentationManager { IgniteInternalFuture<?> idxDfrgFut = null; DataPageEvictionMode prevPageEvictionMode = null; - for (CacheGroupContext oldGrpCtx : sharedCtx.cache().cacheGroups()) { - if (!oldGrpCtx.userCache()) - continue; - + for (CacheGroupContext oldGrpCtx : cacheGrpCtxsForDefragmentation) { int grpId = oldGrpCtx.groupId(); - if (!cachesForDefragmentation.isEmpty()) { - if (oldGrpCtx.caches().stream().noneMatch(cctx -> cachesForDefragmentation.contains(cctx.name()))) - continue; - } - File workDir = filePageStoreMgr.cacheWorkDir(oldGrpCtx.sharedGroup(), oldGrpCtx.cacheOrGroupName()); - try { - if (skipAlreadyDefragmentedCacheGroup(workDir, grpId, log)) - continue; + if (skipAlreadyDefragmentedCacheGroup(workDir, grpId, log)) { + status.onCacheGroupSkipped(oldGrpCtx); + continue; + } + + try { GridCacheOffheapManager offheap = (GridCacheOffheapManager)oldGrpCtx.offheap(); List<CacheDataStore> oldCacheDataStores = stream(offheap.cacheDataStores().spliterator(), false) @@ -233,226 +254,243 @@ public class CachePartitionDefragmentationManager { }) .collect(Collectors.toList()); - if (workDir != null && !oldCacheDataStores.isEmpty()) { - // We can't start defragmentation of new group on the region that has wrong eviction mode. - // So waiting of the previous cache group defragmentation is inevitable. - DataPageEvictionMode curPageEvictionMode = oldGrpCtx.dataRegion().config().getPageEvictionMode(); + status.onCacheGroupStart(oldGrpCtx, oldCacheDataStores.size()); - if (prevPageEvictionMode == null || prevPageEvictionMode != curPageEvictionMode) { - prevPageEvictionMode = curPageEvictionMode; + if (workDir == null || oldCacheDataStores.isEmpty()) { + status.onCacheGroupFinish(oldGrpCtx); - partDataRegion.config().setPageEvictionMode(curPageEvictionMode); + continue; + } - if (idxDfrgFut != null) - idxDfrgFut.get(); - } + // We can't start defragmentation of new group on the region that has wrong eviction mode. + // So waiting of the previous cache group defragmentation is inevitable. + DataPageEvictionMode curPageEvictionMode = oldGrpCtx.dataRegion().config().getPageEvictionMode(); - IntMap<CacheDataStore> cacheDataStores = new IntHashMap<>(); + if (prevPageEvictionMode == null || prevPageEvictionMode != curPageEvictionMode) { + prevPageEvictionMode = curPageEvictionMode; - for (CacheDataStore store : offheap.cacheDataStores()) { - // Tree can be null for not yet initialized partitions. - // This would mean that these partitions are empty. - assert store.tree() == null || store.tree().groupId() == grpId; + partDataRegion.config().setPageEvictionMode(curPageEvictionMode); - if (store.tree() != null) - cacheDataStores.put(store.partId(), store); - } + if (idxDfrgFut != null) + idxDfrgFut.get(); + } - dbMgr.checkpointedDataRegions().remove(oldGrpCtx.dataRegion()); + IntMap<CacheDataStore> cacheDataStores = new IntHashMap<>(); - // Another cheat. Ttl cleanup manager knows too much shit. - oldGrpCtx.caches().stream() - .filter(cacheCtx -> cacheCtx.groupId() == grpId) - .forEach(cacheCtx -> cacheCtx.ttl().unregister()); + for (CacheDataStore store : offheap.cacheDataStores()) { + // Tree can be null for not yet initialized partitions. + // This would mean that these partitions are empty. + assert store.tree() == null || store.tree().groupId() == grpId; - // Technically wal is already disabled, but "PageHandler.isWalDeltaRecordNeeded" doesn't care - // and WAL records will be allocated anyway just to be ignored later if we don't disable WAL for - // cache group explicitly. - oldGrpCtx.localWalEnabled(false, false); + if (store.tree() != null) + cacheDataStores.put(store.partId(), store); + } - boolean encrypted = oldGrpCtx.config().isEncryptionEnabled(); + dbMgr.checkpointedDataRegions().remove(oldGrpCtx.dataRegion()); - FilePageStoreFactory pageStoreFactory = filePageStoreMgr.getPageStoreFactory(grpId, encrypted); + // Another cheat. Ttl cleanup manager knows too much shit. + oldGrpCtx.caches().stream() + .filter(cacheCtx -> cacheCtx.groupId() == grpId) + .forEach(cacheCtx -> cacheCtx.ttl().unregister()); - createIndexPageStore(grpId, workDir, pageStoreFactory, partDataRegion, val -> { - }); //TODO Allocated tracker. + // Technically wal is already disabled, but "PageHandler.isWalDeltaRecordNeeded" doesn't care + // and WAL records will be allocated anyway just to be ignored later if we don't disable WAL for + // cache group explicitly. + oldGrpCtx.localWalEnabled(false, false); - checkCancellation(); + boolean encrypted = oldGrpCtx.config().isEncryptionEnabled(); - GridCompoundFuture<Object, Object> cmpFut = new GridCompoundFuture<>(); + FilePageStoreFactory pageStoreFactory = filePageStoreMgr.getPageStoreFactory(grpId, encrypted); - PageMemoryEx oldPageMem = (PageMemoryEx)oldGrpCtx.dataRegion().pageMemory(); + AtomicLong idxAllocationTracker = new GridAtomicLong(); - CacheGroupContext newGrpCtx = new CacheGroupContext( - sharedCtx, - grpId, - oldGrpCtx.receivedFrom(), - CacheType.USER, - oldGrpCtx.config(), - oldGrpCtx.affinityNode(), - partDataRegion, - oldGrpCtx.cacheObjectContext(), - null, - null, - oldGrpCtx.localStartVersion(), - true, - false, - true - ); + createIndexPageStore(grpId, workDir, pageStoreFactory, partDataRegion, idxAllocationTracker::addAndGet); - defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadLock(); + checkCancellation(); - try { - // This will initialize partition meta in index partition - meta tree and reuse list. - newGrpCtx.start(); - } - finally { - defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadUnlock(); - } + GridCompoundFuture<Object, Object> cmpFut = new GridCompoundFuture<>(); - IntMap<LinkMap> linkMapByPart = new IntHashMap<>(); + PageMemoryEx oldPageMem = (PageMemoryEx)oldGrpCtx.dataRegion().pageMemory(); - for (CacheDataStore oldCacheDataStore : oldCacheDataStores) { - checkCancellation(); + CacheGroupContext newGrpCtx = new CacheGroupContext( + sharedCtx, + grpId, + oldGrpCtx.receivedFrom(), + CacheType.USER, + oldGrpCtx.config(), + oldGrpCtx.affinityNode(), + partDataRegion, + oldGrpCtx.cacheObjectContext(), + null, + null, + oldGrpCtx.localStartVersion(), + true, + false, + true + ); - int partId = oldCacheDataStore.partId(); + defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadLock(); - PartitionContext partCtx = new PartitionContext( - workDir, - grpId, - partId, - partDataRegion, - mappingDataRegion, - oldGrpCtx, - newGrpCtx, - cacheDataStores.get(partId), - pageStoreFactory - ); + try { + // This will initialize partition meta in index partition - meta tree and reuse list. + newGrpCtx.start(); + } + finally { + defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadUnlock(); + } - if (skipAlreadyDefragmentedPartition(workDir, grpId, partId, log)) { - partCtx.createPageStore( - () -> defragmentedPartMappingFile(workDir, partId).toPath(), - partCtx.mappingPagesAllocated, - partCtx.mappingPageMemory - ); + IntMap<LinkMap> linkMapByPart = new IntHashMap<>(); - linkMapByPart.put(partId, partCtx.createLinkMapTree(false)); + for (CacheDataStore oldCacheDataStore : oldCacheDataStores) { + checkCancellation(); - continue; - } + int partId = oldCacheDataStore.partId(); + + PartitionContext partCtx = new PartitionContext( + workDir, + grpId, + partId, + partDataRegion, + mappingDataRegion, + oldGrpCtx, + newGrpCtx, + cacheDataStores.get(partId), + pageStoreFactory + ); + if (skipAlreadyDefragmentedPartition(workDir, grpId, partId, log)) { partCtx.createPageStore( () -> defragmentedPartMappingFile(workDir, partId).toPath(), partCtx.mappingPagesAllocated, partCtx.mappingPageMemory ); - linkMapByPart.put(partId, partCtx.createLinkMapTree(true)); + linkMapByPart.put(partId, partCtx.createLinkMapTree(false)); - checkCancellation(); + continue; + } - partCtx.createPageStore( - () -> defragmentedPartTmpFile(workDir, partId).toPath(), - partCtx.partPagesAllocated, - partCtx.partPageMemory - ); + partCtx.createPageStore( + () -> defragmentedPartMappingFile(workDir, partId).toPath(), + partCtx.mappingPagesAllocated, + partCtx.mappingPageMemory + ); + + linkMapByPart.put(partId, partCtx.createLinkMapTree(true)); + + checkCancellation(); + + partCtx.createPageStore( + () -> defragmentedPartTmpFile(workDir, partId).toPath(), + partCtx.partPagesAllocated, + partCtx.partPageMemory + ); partCtx.createNewCacheDataStore(offheap); copyPartitionData(partCtx, treeIter); - IgniteInClosure<IgniteInternalFuture<?>> cpLsnr = fut -> { - if (fut.error() != null) - return; + DefragmentationPageReadWriteManager pageMgr = (DefragmentationPageReadWriteManager)partCtx.partPageMemory.pageManager(); - PageStore oldPageStore = null; + PageStore oldPageStore = filePageStoreMgr.getStore(grpId, partId); - try { - oldPageStore = filePageStoreMgr.getStore(grpId, partId); - } - catch (IgniteCheckedException ignore) { - } + status.onPartitionDefragmented( + oldGrpCtx, + oldPageStore.size(), + pageSize + partCtx.partPagesAllocated.get() * pageSize // + file header. + ); - assert oldPageStore != null; + //TODO Move inside of defragmentSinglePartition. + IgniteInClosure<IgniteInternalFuture<?>> cpLsnr = fut -> { + if (fut.error() != null) + return; + + if (log.isDebugEnabled()) { + log.debug(S.toString( + "Partition defragmented", + "grpId", grpId, false, + "partId", partId, false, + "oldPages", oldPageStore.pages(), false, + "newPages", partCtx.partPagesAllocated.get() + 1, false, + "mappingPages", partCtx.mappingPagesAllocated.get() + 1, false, + "pageSize", pageSize, false, + "partFile", defragmentedPartFile(workDir, partId).getName(), false, + "workDir", workDir, false + )); + } - if (log.isDebugEnabled()) { - log.debug(S.toString( - "Partition defragmented", - "grpId", grpId, false, - "partId", partId, false, - "oldPages", oldPageStore.pages(), false, - "newPages", partCtx.partPagesAllocated.get() + 1, false, - "mappingPages", partCtx.mappingPagesAllocated.get() + 1, false, - "pageSize", pageSize, false, - "partFile", defragmentedPartFile(workDir, partId).getName(), false, - "workDir", workDir, false - )); - } + oldPageMem.invalidate(grpId, partId); - oldPageMem.invalidate(grpId, partId); + partCtx.partPageMemory.invalidate(grpId, partId); - partCtx.partPageMemory.invalidate(grpId, partId); + pageMgr.pageStoreMap().removePageStore(grpId, partId); // Yes, it'll be invalid in a second. - DefragmentationPageReadWriteManager pageMgr = (DefragmentationPageReadWriteManager)partCtx.partPageMemory.pageManager(); + renameTempPartitionFile(workDir, partId); + }; - pageMgr.pageStoreMap().removePageStore(grpId, partId); // Yes, it'll be invalid in a second. + GridFutureAdapter<?> cpFut = defragmentationCheckpoint + .forceCheckpoint("partition defragmented", null) + .futureFor(CheckpointState.FINISHED); - renameTempPartitionFile(workDir, partId); - }; + cpFut.listen(cpLsnr); - GridFutureAdapter<?> cpFut = defragmentationCheckpoint - .forceCheckpoint("partition defragmented", null) - .futureFor(CheckpointState.FINISHED); + cmpFut.add((IgniteInternalFuture<Object>)cpFut); + } - cpFut.listen(cpLsnr); + // A bit too general for now, but I like it more then saving only the last checkpoint future. + cmpFut.markInitialized().get(); - cmpFut.add((IgniteInternalFuture<Object>)cpFut); - } + idxDfrgFut = new GridFinishedFuture<>(); - // A bit too general for now, but I like it more then saving only the last checkpoint future. - cmpFut.markInitialized().get(); + if (filePageStoreMgr.hasIndexStore(grpId)) { + defragmentIndexPartition(oldGrpCtx, newGrpCtx, linkMapByPart); - idxDfrgFut = new GridFinishedFuture<>(); + idxDfrgFut = defragmentationCheckpoint + .forceCheckpoint("index defragmented", null) + .futureFor(CheckpointState.FINISHED); + } - if (filePageStoreMgr.hasIndexStore(grpId)) { - defragmentIndexPartition(oldGrpCtx, newGrpCtx, linkMapByPart); + idxDfrgFut = idxDfrgFut.chain(fut -> { + oldPageMem.invalidate(grpId, PageIdAllocator.INDEX_PARTITION); - idxDfrgFut = defragmentationCheckpoint - .forceCheckpoint("index defragmented", null) - .futureFor(CheckpointState.FINISHED); - } + PageMemoryEx partPageMem = (PageMemoryEx)partDataRegion.pageMemory(); - idxDfrgFut = idxDfrgFut.chain(fut -> { - oldPageMem.invalidate(grpId, PageIdAllocator.INDEX_PARTITION); + partPageMem.invalidate(grpId, PageIdAllocator.INDEX_PARTITION); - PageMemoryEx partPageMem = (PageMemoryEx)partDataRegion.pageMemory(); + DefragmentationPageReadWriteManager pageMgr = (DefragmentationPageReadWriteManager)partPageMem.pageManager(); - partPageMem.invalidate(grpId, PageIdAllocator.INDEX_PARTITION); + pageMgr.pageStoreMap().removePageStore(grpId, PageIdAllocator.INDEX_PARTITION); - DefragmentationPageReadWriteManager pageMgr = (DefragmentationPageReadWriteManager)partPageMem.pageManager(); + PageMemoryEx mappingPageMem = (PageMemoryEx)mappingDataRegion.pageMemory(); - pageMgr.pageStoreMap().removePageStore(grpId, PageIdAllocator.INDEX_PARTITION); + pageMgr = (DefragmentationPageReadWriteManager)mappingPageMem.pageManager(); - PageMemoryEx mappingPageMem = (PageMemoryEx)mappingDataRegion.pageMemory(); + pageMgr.pageStoreMap().clear(grpId); - pageMgr = (DefragmentationPageReadWriteManager)mappingPageMem.pageManager(); + renameTempIndexFile(workDir); - pageMgr.pageStoreMap().clear(grpId); + writeDefragmentationCompletionMarker(filePageStoreMgr.getPageStoreFileIoFactory(), workDir, log); - renameTempIndexFile(workDir); + batchRenameDefragmentedCacheGroupPartitions(workDir, log); - writeDefragmentationCompletionMarker(filePageStoreMgr.getPageStoreFileIoFactory(), workDir, log); + return null; + }); - batchRenameDefragmentedCacheGroupPartitions(workDir, log); - return null; - }); - } + PageStore oldIdxPageStore = filePageStoreMgr.getStore(grpId, INDEX_PARTITION); + + status.onIndexDefragmented( + oldGrpCtx, + oldIdxPageStore.size(), + pageSize + idxAllocationTracker.get() * pageSize // + file header. + ); } catch (DefragmentationCancelledException e) { DefragmentationFileUtils.deleteLeftovers(workDir); throw e; } + + status.onCacheGroupFinish(oldGrpCtx); } if (idxDfrgFut != null) @@ -460,18 +498,24 @@ public class CachePartitionDefragmentationManager { mntcReg.unregisterMaintenanceTask(DEFRAGMENTATION_MNTC_TASK_NAME); - log.info("Defragmentation completed. All partitions are defragmented."); + status.onFinish(); completionFut.onDone(); } catch (DefragmentationCancelledException e) { mntcReg.unregisterMaintenanceTask(DEFRAGMENTATION_MNTC_TASK_NAME); - log.info("Defragmentation has been cancelled."); + log.info("Defragmentation process has been cancelled."); + + status.onFinish(); completionFut.onDone(); } catch (Throwable t) { + log.error("Defragmentation process failed.", t); + + status.onFinish(); + completionFut.onDone(t); throw t; @@ -553,7 +597,7 @@ public class CachePartitionDefragmentationManager { /** */ public String status() { - throw new UnsupportedOperationException("Not implemented yet."); + return status.toString(); } /** @@ -918,4 +962,209 @@ public class CachePartitionDefragmentationManager { /** Serial version uid. */ private static final long serialVersionUID = 0L; } + + /** */ + private class DefragmentationStatus { + /** */ + private long startTs; + + /** */ + private long finishTs; + + /** */ + private final Set<String> scheduledGroups = new TreeSet<>(); + + /** */ + private final Map<CacheGroupContext, DefragmentationCacheGroupProgress> progressGroups + = new TreeMap<>(comparing(CacheGroupContext::cacheOrGroupName)); + + /** */ + private final Map<CacheGroupContext, DefragmentationCacheGroupProgress> finishedGroups + = new TreeMap<>(comparing(CacheGroupContext::cacheOrGroupName)); + + /** */ + private final Set<String> skippedGroups = new TreeSet<>(); + + /** */ + public synchronized void onStart(Set<CacheGroupContext> scheduledGroups) { + startTs = System.currentTimeMillis(); + + for (CacheGroupContext grp : scheduledGroups) { + this.scheduledGroups.add(grp.cacheOrGroupName()); + } + + log.info("Defragmentation started."); + } + + /** */ + public synchronized void onCacheGroupStart(CacheGroupContext grpCtx, int parts) { + scheduledGroups.remove(grpCtx.cacheOrGroupName()); + + progressGroups.put(grpCtx, new DefragmentationCacheGroupProgress(parts)); + } + + /** */ + public synchronized void onPartitionDefragmented(CacheGroupContext grpCtx, long oldSize, long newSize) { + progressGroups.get(grpCtx).onPartitionDefragmented(oldSize, newSize); + } + + /** */ + public synchronized void onIndexDefragmented(CacheGroupContext grpCtx, long oldSize, long newSize) { + progressGroups.get(grpCtx).onIndexDefragmented(oldSize, newSize); + } + + /** */ + public synchronized void onCacheGroupFinish(CacheGroupContext grpCtx) { + DefragmentationCacheGroupProgress progress = progressGroups.remove(grpCtx); + + progress.onFinish(); + + finishedGroups.put(grpCtx, progress); + } + + /** */ + public synchronized void onCacheGroupSkipped(CacheGroupContext grpCtx) { + scheduledGroups.remove(grpCtx.cacheOrGroupName()); + + skippedGroups.add(grpCtx.cacheOrGroupName()); + } + + /** */ + public synchronized void onFinish() { + finishTs = System.currentTimeMillis(); + + progressGroups.clear(); + + scheduledGroups.clear(); + + log.info("Defragmentation process completed. Time: " + (finishTs - startTs) * 1e-3 + "s."); + } + + /** {@inheritDoc} */ + @Override public synchronized String toString() { + StringBuilder sb = new StringBuilder(); + + if (!finishedGroups.isEmpty()) { + sb.append("Defragmentation is completed for cache groups:\n"); + + for (Map.Entry<CacheGroupContext, DefragmentationCacheGroupProgress> entry : finishedGroups.entrySet()) { + sb.append(" ").append(entry.getKey().cacheOrGroupName()).append(" - "); + + sb.append(entry.getValue().toString()).append('\n'); + } + } + + if (!progressGroups.isEmpty()) { + sb.append("Defragmentation is in progress for cache groups:\n"); + + for (Map.Entry<CacheGroupContext, DefragmentationCacheGroupProgress> entry : progressGroups.entrySet()) { + sb.append(" ").append(entry.getKey().cacheOrGroupName()).append(" - "); + + sb.append(entry.getValue().toString()).append('\n'); + } + } + + if (!skippedGroups.isEmpty()) + sb.append("Skipped cache groups: ").append(String.join(", ", skippedGroups)).append('\n'); + + if (!scheduledGroups.isEmpty()) + sb.append("Awaiting defragmentation: ").append(String.join(", ", scheduledGroups)).append('\n'); + + return sb.toString(); + } + } + + /** */ + private static class DefragmentationCacheGroupProgress { + /** */ + private static final DecimalFormat MB_FORMAT = new DecimalFormat( + "#.##", + DecimalFormatSymbols.getInstance(Locale.US) + ); + + /** */ + private final int partsTotal; + + /** */ + private int partsCompleted; + + /** */ + private long oldSize; + + /** */ + private long newSize; + + /** */ + private final long startTs; + + /** */ + private long finishTs; + + /** */ + public DefragmentationCacheGroupProgress(int parts) { + partsTotal = parts; + + startTs = System.currentTimeMillis(); + } + + /** + * @param oldSize Old partition size. + * @param newSize New partition size. + */ + public void onPartitionDefragmented(long oldSize, long newSize) { + ++partsCompleted; + + this.oldSize += oldSize; + this.newSize += newSize; + } + + /** + * @param oldSize Old partition size. + * @param newSize New partition size. + */ + public void onIndexDefragmented(long oldSize, long newSize) { + this.oldSize += oldSize; + this.newSize += newSize; + } + + /** */ + public void onFinish() { + finishTs = System.currentTimeMillis(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + StringBuilder sb = new StringBuilder(); + + if (finishTs == 0) { + sb.append("partitions processed/all: ").append(partsCompleted).append("/").append(partsTotal); + + sb.append(", time elapsed: "); + + appendDuration(sb, System.currentTimeMillis()); + } + else { + double mb = 1024 * 1024; + + sb.append("size before/after: ").append(MB_FORMAT.format(oldSize / mb)).append("MB/"); + sb.append(MB_FORMAT.format(newSize / mb)).append("MB"); + + sb.append(", time took: "); + + appendDuration(sb, finishTs); + } + + return sb.toString(); + } + + /** */ + private void appendDuration(StringBuilder sb, long end) { + long duration = Math.round((end - startTs) * 1e-3); + + long mins = duration / 60; + long secs = duration % 60; + + sb.append(mins).append(" mins ").append(secs).append(" secs"); + } + } }