This is an automated email from the ASF dual-hosted git repository. tkalkirill pushed a commit to branch ignite-27308 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 2072e3d44b25a283e992a780764574213c861a88 Author: Kirill Tkalenko <[email protected]> AuthorDate: Thu Jan 8 11:36:34 2026 +0300 IGNITE-27308 wip --- .../persistence/checkpoint/Checkpointer.java | 4 +- .../persistence/compaction/CompactionRound.java | 66 ++++++ .../persistence/compaction/Compactor.java | 232 +++++++++++---------- .../compaction/DeltaFileForCompaction.java | 37 ++++ .../persistence/compaction/CompactorTest.java | 58 ++++-- 5 files changed, 259 insertions(+), 138 deletions(-) diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java index a7a786f1d05..3efee1a7c41 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/Checkpointer.java @@ -352,7 +352,7 @@ public class Checkpointer extends IgniteWorker { Checkpoint chp = null; try { - compactor.pause(); + compactor.notifyCheckpointStart(); var tracker = new CheckpointMetricsTracker(); @@ -470,7 +470,7 @@ public class Checkpointer extends IgniteWorker { } finally { currentCheckpointProgressForThrottling = null; - compactor.resume(); + compactor.notifyCheckpointFinish(); } } diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactionRound.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactionRound.java new file mode 100644 index 00000000000..9408e3b0182 --- /dev/null +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactionRound.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagememory.persistence.compaction; + +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; +import org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStoreIo; +import org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager; + +/** Data class for compaction round. */ +class CompactionRound { + /** Compaction round ID. */ + final UUID id = UUID.randomUUID(); + + /** Total number of partition files. */ + final int partitionFileCount; + + /** Total number of all partition delta files. */ + final int totalDeltaFileCount; + + /** Queue of delta files (one per partition) to be merged into the partition file. */ + final Queue<DeltaFileForCompaction> queue; + + private CompactionRound(int partitionFileCount, int totalDeltaFileCount, Queue<DeltaFileForCompaction> queue) { + this.partitionFileCount = partitionFileCount; + this.totalDeltaFileCount = totalDeltaFileCount; + this.queue = queue; + } + + static CompactionRound create(FilePageStoreManager filePageStoreManager) { + var partitionFileCount = new int[]{0}; + var totalDeltaFileCount = new int[]{0}; + + var queue = new ConcurrentLinkedQueue<DeltaFileForCompaction>(); + + filePageStoreManager.allPageStores().forEach(pageStore -> { + partitionFileCount[0]++; + + totalDeltaFileCount[0] += pageStore.pageStore().deltaFileCount(); + + DeltaFilePageStoreIo deltaFileToCompaction = pageStore.pageStore().getDeltaFileToCompaction(); + + if (deltaFileToCompaction != null) { + queue.add(new DeltaFileForCompaction(pageStore, deltaFileToCompaction)); + } + }); + + return new CompactionRound(partitionFileCount[0], totalDeltaFileCount[0], queue); + } +} diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java index cae78c256cb..675193e27f5 100644 --- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/Compactor.java @@ -20,16 +20,11 @@ package org.apache.ignite.internal.pagememory.persistence.compaction; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS; import static java.util.concurrent.TimeUnit.SECONDS; -import static java.util.stream.Collectors.toCollection; import static org.apache.ignite.internal.failure.FailureType.SYSTEM_WORKER_TERMINATION; import java.nio.ByteBuffer; import java.nio.ByteOrder; -import java.util.Objects; -import java.util.Queue; -import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -46,7 +41,6 @@ import org.apache.ignite.internal.pagememory.persistence.WriteSpeedFormatter; import org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStoreIo; import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore; import org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager; -import org.apache.ignite.internal.pagememory.persistence.store.GroupPageStoresMap.GroupPartitionPageStore; import org.apache.ignite.internal.thread.IgniteThread; import org.apache.ignite.internal.thread.IgniteThreadFactory; import org.apache.ignite.internal.util.IgniteUtils; @@ -67,6 +61,10 @@ import org.jetbrains.annotations.Nullable; * <li>Fsync of the partition file.</li> * <li>Remove delta file from {@link FilePageStore} and file system.</li> * </ul> + * + * <p>Optimization has been implemented to speed up checkpointing. When a checkpoint starts, compaction is stopped to allow for the IO + * operations for it. However, this is only true as long as the total number of delta files does not exceed 3 * partitions, to prevent + * errors due to a large number of open files.</p> */ public class Compactor extends IgniteWorker { /** Logger. */ @@ -92,8 +90,11 @@ public class Compactor extends IgniteWorker { private final PartitionDestructionLockManager partitionDestructionLockManager; - /** Guarded by {@link #mux}. */ - private boolean paused; + /** Flag indicating whether a checkpoint has started. Guarded by {@link #mux}. */ + private boolean isCheckpointStarted; + + /** Current compaction round, {@code null} means the round has either not started yet or has finished. */ + private volatile @Nullable CompactionRound currentCompactionRound; /** * Creates new ignite worker with given parameters. @@ -167,7 +168,7 @@ public class Compactor extends IgniteWorker { void waitDeltaFiles() { try { synchronized (mux) { - while ((!addedDeltaFiles || paused) && !isCancelled()) { + while (!addedDeltaFiles && !isCancelled()) { blockingSectionBegin(); try { @@ -206,106 +207,107 @@ public class Compactor extends IgniteWorker { * pages, we must look for it from the oldest delta file. */ void doCompaction() { - while (!isPaused()) { - // Let's collect one delta file for each partition. - Queue<DeltaFileForCompaction> queue = filePageStoreManager.allPageStores() - .map(groupPartitionFilePageStore -> { - DeltaFilePageStoreIo deltaFileToCompaction = groupPartitionFilePageStore.pageStore().getDeltaFileToCompaction(); - - if (deltaFileToCompaction == null) { - return null; - } - - return new DeltaFileForCompaction(groupPartitionFilePageStore, deltaFileToCompaction); - }) - .filter(Objects::nonNull) - .collect(toCollection(ConcurrentLinkedQueue::new)); + while (true) { + CompactionRound compactionRound = CompactionRound.create(filePageStoreManager); - if (queue.isEmpty()) { + if (compactionRound.queue.isEmpty()) { break; } - String compactionId = UUID.randomUUID().toString(); + setCurrentCompactionRound(compactionRound); - if (LOG.isInfoEnabled()) { - LOG.info("Starting new compaction round [compactionId={}, files={}]", compactionId, queue.size()); - } + try { + if (shouldStopCompaction()) { + break; + } - CompactionMetricsTracker tracker = new CompactionMetricsTracker(); + if (LOG.isInfoEnabled()) { + LOG.info( + "Starting new compaction round [compactionId={}, files={}]", + compactionRound.id, + compactionRound.queue.size() + ); + } - updateHeartbeat(); + CompactionMetricsTracker tracker = new CompactionMetricsTracker(); - int threads = threadPoolExecutor == null ? 1 : threadPoolExecutor.getMaximumPoolSize(); + updateHeartbeat(); - CompletableFuture<?>[] futures = new CompletableFuture[threads]; + int threads = threadPoolExecutor == null ? 1 : threadPoolExecutor.getMaximumPoolSize(); - for (int i = 0; i < threads; i++) { - CompletableFuture<?> future = futures[i] = new CompletableFuture<>(); + CompletableFuture<?>[] futures = new CompletableFuture[threads]; - Runnable merger = () -> { - DeltaFileForCompaction toMerge; + for (int i = 0; i < threads; i++) { + CompletableFuture<?> future = futures[i] = new CompletableFuture<>(); - try { - while (true) { - toMerge = queue.poll(); + Runnable merger = () -> { + DeltaFileForCompaction toMerge; - if (toMerge == null) { - break; - } + try { + while (true) { + toMerge = compactionRound.queue.poll(); + + if (toMerge == null) { + break; + } - GroupPartitionId groupPartitionId = toMerge.groupPartitionFilePageStore.groupPartitionId(); + GroupPartitionId groupPartitionId = toMerge.groupPartitionFilePageStore.groupPartitionId(); - Lock partitionDestructionLock = partitionDestructionLockManager.destructionLock(groupPartitionId).readLock(); + Lock partitionDestructionLock = partitionDestructionLockManager.destructionLock(groupPartitionId) + .readLock(); - partitionDestructionLock.lock(); + partitionDestructionLock.lock(); - try { - mergeDeltaFileToMainFile( - toMerge.groupPartitionFilePageStore.pageStore(), - toMerge.deltaFilePageStoreIo, - tracker - ); - } finally { - partitionDestructionLock.unlock(); + try { + mergeDeltaFileToMainFile( + toMerge.groupPartitionFilePageStore.pageStore(), + toMerge.deltaFilePageStoreIo, + tracker + ); + } finally { + partitionDestructionLock.unlock(); + } } + } catch (Throwable ex) { + future.completeExceptionally(ex); } - } catch (Throwable ex) { - future.completeExceptionally(ex); - } - future.complete(null); - }; + future.complete(null); + }; - if (isCancelled()) { - return; - } + if (isCancelled()) { + return; + } - if (threadPoolExecutor == null) { - merger.run(); - } else { - threadPoolExecutor.execute(merger); + if (threadPoolExecutor == null) { + merger.run(); + } else { + threadPoolExecutor.execute(merger); + } } - } - updateHeartbeat(); + updateHeartbeat(); - // Wait and check for errors. - CompletableFuture.allOf(futures).join(); + // Wait and check for errors. + CompletableFuture.allOf(futures).join(); - tracker.onCompactionEnd(); + tracker.onCompactionEnd(); - if (LOG.isInfoEnabled()) { - long totalWriteBytes = (long) pageSize * tracker.dataPagesWritten(); - long totalDurationInNanos = tracker.totalDuration(NANOSECONDS); + if (LOG.isInfoEnabled()) { + long totalWriteBytes = (long) pageSize * tracker.dataPagesWritten(); + long totalDurationInNanos = tracker.totalDuration(NANOSECONDS); - LOG.info( - "Compaction round finished [compactionId={}, pages={}, skipped={}, duration={}ms, avgWriteSpeed={}MB/s]", - compactionId, - tracker.dataPagesWritten(), - tracker.dataPagesSkipped(), - tracker.totalDuration(MILLISECONDS), - WriteSpeedFormatter.formatWriteSpeed(totalWriteBytes, totalDurationInNanos) - ); + LOG.info( + "Compaction round finished [compactionId={}, pages={}, skipped={}, duration={}ms, avgWriteSpeed={}MB/s]", + compactionRound.id, + tracker.dataPagesWritten(), + tracker.dataPagesSkipped(), + tracker.totalDuration(MILLISECONDS), + WriteSpeedFormatter.formatWriteSpeed(totalWriteBytes, totalDurationInNanos) + ); + } + } finally { + resetCurrentCompactionRound(); } } } @@ -357,8 +359,6 @@ public class Compactor extends IgniteWorker { } synchronized (mux) { - paused = false; - // Do not interrupt runner thread. isCancelled.set(true); @@ -487,38 +487,21 @@ public class Compactor extends IgniteWorker { } /** - * Delta file for compaction. + * Notifies about the start of a checkpoint. It is expected that this method will not be called multiple times in parallel and + * subsequent calls will strictly be calls after {@link #notifyCheckpointFinish}. */ - private static class DeltaFileForCompaction { - private final GroupPartitionPageStore<FilePageStore> groupPartitionFilePageStore; - - private final DeltaFilePageStoreIo deltaFilePageStoreIo; - - private DeltaFileForCompaction( - GroupPartitionPageStore<FilePageStore> groupPartitionFilePageStore, - DeltaFilePageStoreIo deltaFilePageStoreIo - ) { - this.groupPartitionFilePageStore = groupPartitionFilePageStore; - this.deltaFilePageStoreIo = deltaFilePageStoreIo; - } - } - - /** - * Pauses the compactor until it is resumed or compactor is stopped. It is expected that this method will not be called multiple times - * in parallel and subsequent calls will strictly be calls after {@link #resume}. - */ - public void pause() { + public void notifyCheckpointStart() { synchronized (mux) { - assert !paused : "It is expected that a further pause will only occur after resume"; + assert !isCheckpointStarted : "Expected to be called after a checkpoint is finished"; - paused = true; + isCheckpointStarted = true; } } - /** Resumes the compactor if it was paused. It is expected that this method will not be called multiple times in parallel. */ - public void resume() { + /** Notifies about the finish of a checkpoint. It is expected that this method will not be called multiple times in parallel. */ + public void notifyCheckpointFinish() { synchronized (mux) { - paused = false; + isCheckpointStarted = false; // Force compaction as we could stop somewhere in the middle and we need to continue compaction. addedDeltaFiles = true; @@ -527,14 +510,37 @@ public class Compactor extends IgniteWorker { } } - /** Must be called before each IO operation to pause the current compaction and to provide IO resources to other components. */ - private boolean isPaused() { + private boolean shouldStopCompaction(FilePageStore filePageStore) { + return filePageStore.isMarkedToDestroy() || shouldStopCompaction(); + } + + private boolean shouldStopCompaction() { + if (isCancelled()) { + return true; + } + synchronized (mux) { - return paused; + // We only need to stop compaction if a checkpoint has occurred in parallel and the total number of delta files is not yet high + // enough. The number 3 is based on observations of failed tests and intuition; it may change in the future. + return isCheckpointStarted && currentCompactionRound().totalDeltaFileCount < (3 * currentCompactionRound().partitionFileCount); } } - private boolean shouldStopCompaction(FilePageStore filePageStore) { - return isCancelled() || filePageStore.isMarkedToDestroy() || isPaused(); + private void setCurrentCompactionRound(CompactionRound compactionRound) { + assert currentCompactionRound == null : "Previous round is expected to be completed"; + + currentCompactionRound = compactionRound; + } + + private void resetCurrentCompactionRound() { + currentCompactionRound = null; + } + + private CompactionRound currentCompactionRound() { + CompactionRound compactionRound = currentCompactionRound; + + assert compactionRound != null : "Compaction round has not yet started"; + + return compactionRound; } } diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/DeltaFileForCompaction.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/DeltaFileForCompaction.java new file mode 100644 index 00000000000..ca8cc2bf5c9 --- /dev/null +++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/persistence/compaction/DeltaFileForCompaction.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagememory.persistence.compaction; + +import org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStoreIo; +import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore; +import org.apache.ignite.internal.pagememory.persistence.store.GroupPageStoresMap.GroupPartitionPageStore; + +/** Delta file for compaction. */ +class DeltaFileForCompaction { + final GroupPartitionPageStore<FilePageStore> groupPartitionFilePageStore; + + final DeltaFilePageStoreIo deltaFilePageStoreIo; + + DeltaFileForCompaction( + GroupPartitionPageStore<FilePageStore> groupPartitionFilePageStore, + DeltaFilePageStoreIo deltaFilePageStoreIo + ) { + this.groupPartitionFilePageStore = groupPartitionFilePageStore; + this.deltaFilePageStoreIo = deltaFilePageStoreIo; + } +} diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java index 0ab77fe9ae5..9d32a5a3622 100644 --- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java +++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/compaction/CompactorTest.java @@ -47,6 +47,7 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.internal.failure.FailureManager; import org.apache.ignite.internal.pagememory.io.PageIo; @@ -220,13 +221,11 @@ public class CompactorTest extends BaseIgniteAbstractTest { when(filePageStore.getDeltaFileToCompaction()).then(answer -> deltaFilePageStoreIoRef.get()); - FilePageStoreManager filePageStoreManager = mock(FilePageStoreManager.class); - - GroupPageStoresMap<FilePageStore> groupPageStoresMap = new GroupPageStoresMap<>(new LongOperationAsyncExecutor("test", log)); + var groupPageStoresMap = new GroupPageStoresMap<FilePageStore>(new LongOperationAsyncExecutor("test", log)); groupPageStoresMap.put(new GroupPartitionId(0, 0), filePageStore); - when(filePageStoreManager.allPageStores()).then(answer -> groupPageStoresMap.getAll()); + FilePageStoreManager filePageStoreManager = newFilePageStoreManager(groupPageStoresMap); Compactor compactor = spy(newCompactor(filePageStoreManager)); @@ -303,18 +302,21 @@ public class CompactorTest extends BaseIgniteAbstractTest { } @Test - void testPauseResume() throws Exception { - Compactor compactor = spy(newCompactor()); + void testNotifyCheckpointStartAndFinish() throws Exception { + var groupPageStoresMap = new GroupPageStoresMap<FilePageStore>(new LongOperationAsyncExecutor("test", log)); + + FilePageStoreManager filePageStoreManager = newFilePageStoreManager(groupPageStoresMap); + + Compactor compactor = spy(newCompactor(filePageStoreManager)); - compactor.pause(); + compactor.notifyCheckpointStart(); DeltaFilePageStoreIo deltaFilePageStoreIo = createDeltaFilePageStoreIo(); FilePageStore filePageStore = createFilePageStore(deltaFilePageStoreIo); - assertThat( - runAsync(() -> compactor.mergeDeltaFileToMainFile(filePageStore, deltaFilePageStoreIo, new CompactionMetricsTracker())), - willCompleteSuccessfully() - ); + groupPageStoresMap.put(new GroupPartitionId(0, 0), filePageStore); + + assertThat(runAsync(compactor::doCompaction), willCompleteSuccessfully()); verify(filePageStore, never()).write(anyLong(), any()); verify(filePageStore, never()).sync(); @@ -324,12 +326,9 @@ public class CompactorTest extends BaseIgniteAbstractTest { verify(deltaFilePageStoreIo, never()).markMergedToFilePageStore(); verify(deltaFilePageStoreIo, never()).stop(anyBoolean()); - compactor.resume(); + compactor.notifyCheckpointFinish(); - assertThat( - runAsync(() -> compactor.mergeDeltaFileToMainFile(filePageStore, deltaFilePageStoreIo, new CompactionMetricsTracker())), - willCompleteSuccessfully() - ); + assertThat(runAsync(compactor::doCompaction), willCompleteSuccessfully()); verify(filePageStore).write(anyLong(), any()); verify(filePageStore).sync(); @@ -341,18 +340,15 @@ public class CompactorTest extends BaseIgniteAbstractTest { } @Test - void testTriggerCompactionAfterPause() { + void testTriggerCompactionAfterNotifyCheckpointStartAndFinish() { Compactor compactor = spy(newCompactor()); - compactor.pause(); + compactor.notifyCheckpointStart(); CompletableFuture<?> waitDeltaFilesFuture = runAsync(compactor::waitDeltaFiles); assertThat(waitDeltaFilesFuture, willTimeoutFast()); compactor.triggerCompaction(); - assertThat(waitDeltaFilesFuture, willTimeoutFast()); - - compactor.resume(); assertThat(waitDeltaFilesFuture, willCompleteSuccessfully()); } @@ -363,7 +359,7 @@ public class CompactorTest extends BaseIgniteAbstractTest { CompletableFuture<?> waitDeltaFilesFuture = runAsync(compactor::waitDeltaFiles); assertThat(waitDeltaFilesFuture, willTimeoutFast()); - compactor.resume(); + compactor.notifyCheckpointFinish(); assertThat(waitDeltaFilesFuture, willCompleteSuccessfully()); } @@ -407,8 +403,24 @@ public class CompactorTest extends BaseIgniteAbstractTest { private static FilePageStore createFilePageStore(DeltaFilePageStoreIo deltaFilePageStoreIo) { FilePageStore filePageStore = mock(FilePageStore.class); - when(filePageStore.removeDeltaFile(eq(deltaFilePageStoreIo))).thenReturn(true); + var removedDeltaFile = new AtomicBoolean(); + + when(filePageStore.removeDeltaFile(eq(deltaFilePageStoreIo))).then(invocation -> { + removedDeltaFile.set(true); + + return true; + }); + when(filePageStore.getDeltaFileToCompaction()).then(invocation -> removedDeltaFile.get() ? null : deltaFilePageStoreIo); + when(filePageStore.deltaFileCount()).thenReturn(1); return filePageStore; } + + private static FilePageStoreManager newFilePageStoreManager(GroupPageStoresMap<FilePageStore> map) { + FilePageStoreManager manager = mock(FilePageStoreManager.class); + + when(manager.allPageStores()).then(invocation -> map.getAll()); + + return manager; + } }
