This is an automated email from the ASF dual-hosted git repository. timoninmaxim pushed a commit to branch IGNITE-17177_inc_snapshots in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/IGNITE-17177_inc_snapshots by this push: new bd2056191ba IGNITE-17613 Copy of binary metadata added (#10350) bd2056191ba is described below commit bd2056191ba0f334503db69665a3ec74a5fee658 Author: Nikolay <nizhi...@apache.org> AuthorDate: Thu Oct 27 22:57:52 2022 +0300 IGNITE-17613 Copy of binary metadata added (#10350) --- .../snapshot/IncrementalSnapshotFutureTask.java | 113 +++++++++++++++------ 1 file changed, 83 insertions(+), 30 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java index 2bbaad1d4c4..01c44e178c4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotFutureTask.java @@ -18,6 +18,8 @@ package org.apache.ignite.internal.processors.cache.persistence.snapshot; import java.io.File; +import java.io.FileFilter; +import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.HashSet; @@ -25,14 +27,22 @@ import java.util.Set; import java.util.UUID; import java.util.function.BiConsumer; import org.apache.ignite.IgniteException; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.MarshallerContextImpl; +import org.apache.ignite.internal.binary.BinaryUtils; import org.apache.ignite.internal.pagemem.wal.record.delta.ClusterSnapshotRecord; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings; import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId; import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; import org.apache.ignite.internal.util.typedef.internal.CU; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.internal.binary.BinaryUtils.METADATA_FILE_SUFFIX; +import static org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl.binaryWorkDir; + /** */ class IncrementalSnapshotFutureTask extends AbstractSnapshotFutureTask<IncrementalSnapshotFutureTaskResult> @@ -122,41 +132,25 @@ class IncrementalSnapshotFutureTask cctx.kernalContext().pools().getSnapshotExecutorService().submit(() -> { try { - // First increment must include low segment, because full snapshot knows nothing about WAL. - // All other begins from the next segment because lowPtr already saved inside previous increment. - long lowIdx = lowPtr.index() + (incIdx == 1 ? 0 : 1); - long highIdx = highPtr.index(); - - assert cctx.gridConfig().getDataStorageConfiguration().isWalCompactionEnabled() - : "WAL Compaction must be enabled"; - assert lowIdx <= highIdx; - - if (log.isInfoEnabled()) - log.info("Waiting for WAL segments compression [lowIdx=" + lowIdx + ", highIdx=" + highIdx + ']'); - - cctx.wal().awaitCompacted(highPtr.index()); + copyWal(incSnpDir); - if (log.isInfoEnabled()) { - log.info("Linking WAL segments into incremental snapshot [lowIdx=" + lowIdx + ", " + - "highIdx=" + highIdx + ']'); - } + File snpMarshallerDir = MarshallerContextImpl.mappingFileStoreWorkDir(incSnpDir.getAbsolutePath()); - for (; lowIdx <= highIdx; lowIdx++) { - File seg = cctx.wal().compactedSegment(lowIdx); + copyFiles( + MarshallerContextImpl.mappingFileStoreWorkDir(cctx.gridConfig().getWorkDirectory()), + snpMarshallerDir, + BinaryUtils::notTmpFile + ); - if (!seg.exists()) { - onDone(new IgniteException("WAL segment not found in archive [idx=" + lowIdx + ']')); + PdsFolderSettings<?> pdsSettings = cctx.kernalContext().pdsFolderResolver().resolveFolders(); - return; - } + File snpBinMetaDir = new File(incSnpDir, DataStorageConfiguration.DFLT_BINARY_METADATA_PATH); - Path segLink = incSnpDir.toPath().resolve(seg.getName()); - - if (log.isDebugEnabled()) - log.debug("Creaing segment link [path=" + segLink.toAbsolutePath() + ']'); - - Files.createLink(segLink, seg.toPath()); - } + copyFiles( + binaryWorkDir(cctx.gridConfig().getWorkDirectory(), pdsSettings.folderName()), + snpBinMetaDir, + file -> file.getName().endsWith(METADATA_FILE_SUFFIX) + ); onDone(new IncrementalSnapshotFutureTaskResult()); } @@ -172,6 +166,65 @@ class IncrementalSnapshotFutureTask } } + /** + * Copies WAL segments to the incremental snapshot directory. + * + * @param incSnpDir Incremental snapshot directory. + * @throws IgniteInterruptedCheckedException If failed. + * @throws IOException If failed. + */ + private void copyWal(File incSnpDir) throws IgniteInterruptedCheckedException, IOException { + // First increment must include low segment, because full snapshot knows nothing about WAL. + // All other begins from the next segment because lowPtr already saved inside previous increment. + long lowIdx = lowPtr.index() + (incIdx == 1 ? 0 : 1); + long highIdx = highPtr.index(); + + assert cctx.gridConfig().getDataStorageConfiguration().isWalCompactionEnabled() + : "WAL Compaction must be enabled"; + assert lowIdx <= highIdx; + + if (log.isInfoEnabled()) + log.info("Waiting for WAL segments compression [lowIdx=" + lowIdx + ", highIdx=" + highIdx + ']'); + + cctx.wal().awaitCompacted(highPtr.index()); + + if (log.isInfoEnabled()) { + log.info("Linking WAL segments into incremental snapshot [lowIdx=" + lowIdx + ", " + + "highIdx=" + highIdx + ']'); + } + + for (; lowIdx <= highIdx; lowIdx++) { + File seg = cctx.wal().compactedSegment(lowIdx); + + if (!seg.exists()) + throw new IgniteException("WAL segment not found in archive [idx=" + lowIdx + ']'); + + Path segLink = incSnpDir.toPath().resolve(seg.getName()); + + if (log.isDebugEnabled()) + log.debug("Creaing segment link [path=" + segLink.toAbsolutePath() + ']'); + + Files.createLink(segLink, seg.toPath()); + } + } + + /** + * Copy files {@code fromDir} to {@code toDir}. + * + * @param fromDir From directory. + * @param toDir To directory. + * @param filter File filter. + */ + private void copyFiles(File fromDir, File toDir, FileFilter filter) throws IOException { + assert fromDir.exists() && fromDir.isDirectory(); + + if (!toDir.isDirectory() && !toDir.exists() && !toDir.mkdirs()) + throw new IgniteException("Target directory can't be created [target=" + toDir.getAbsolutePath() + ']'); + + for (File from : fromDir.listFiles(filter)) + Files.copy(from.toPath(), new File(toDir, from.getName()).toPath()); + } + /** {@inheritDoc} */ @Override public void acceptException(Throwable th) { cctx.cache().configManager().removeConfigurationChangeListener(this);