This is an automated email from the ASF dual-hosted git repository. sergeychugunov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 0abf6fa IGNITE-13613 API to get full WAL size and implementation to track WAL segments rollover and compression processes - Fixes #8388. 0abf6fa is described below commit 0abf6fafe77ec447f2bfea1d7583a2b85b2ab192 Author: ktkalenko <ktkale...@gridgain.com> AuthorDate: Fri Oct 30 09:51:55 2020 +0300 IGNITE-13613 API to get full WAL size and implementation to track WAL segments rollover and compression processes - Fixes #8388. Signed-off-by: Sergey Chugunov <sergey.chugu...@gmail.com> --- .../internal/dto/IgniteDataTransferObject.java | 6 + .../pagemem/wal/IgniteWriteAheadLogManager.java | 15 + .../processors/cache/GridCacheSharedContext.java | 2 +- .../cache/persistence/wal/FileDescriptor.java | 26 +- .../persistence/wal/FileWriteAheadLogManager.java | 332 +++++++++++++-------- .../persistence/db/wal/IgniteLocalWalSizeTest.java | 229 ++++++++++++++ .../cache/persistence/pagemem/NoOpWALManager.java | 10 + .../ignite/testsuites/IgnitePdsTestSuite2.java | 3 + 8 files changed, 489 insertions(+), 134 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/dto/IgniteDataTransferObject.java b/modules/core/src/main/java/org/apache/ignite/internal/dto/IgniteDataTransferObject.java index 4d735d0..279586d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/dto/IgniteDataTransferObject.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/dto/IgniteDataTransferObject.java @@ -59,6 +59,12 @@ public abstract class IgniteDataTransferObject implements Externalizable { /** Version 7. */ protected static final byte V7 = 7; + /** Version 8. */ + protected static final byte V8 = 8; + + /** Version 9. */ + protected static final byte V9 = 9; + /** * @param col Source collection. * @param <T> Collection type. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java index cc183bf..cb4fc30 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java @@ -214,4 +214,19 @@ public interface IgniteWriteAheadLogManager extends GridCacheSharedManager, Igni * @param grpId Group id. */ public boolean disabled(int grpId); + + /** + * Getting local WAL segment size. + * + * @param idx Absolute segment index. + * @return Segment size, {@code 0} if size is unknown. + */ + long segmentSize(long idx); + + /** + * Get last written pointer. + * + * @return Last written pointer. + */ + WALPointer lastWritePointer(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java index c1981c6..f40d4d7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java @@ -760,7 +760,7 @@ public class GridCacheSharedContext<K, V> { /** * @return Write ahead log manager. */ - public IgniteWriteAheadLogManager wal() { + @Nullable public IgniteWriteAheadLogManager wal() { return walMgr; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileDescriptor.java index f265376..2f088d1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileDescriptor.java @@ -25,14 +25,12 @@ import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStor import org.apache.ignite.internal.processors.cache.persistence.file.UnzipFileIO; import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO; import org.apache.ignite.internal.util.typedef.internal.SB; -import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; /** * WAL file descriptor. */ public class FileDescriptor implements Comparable<FileDescriptor>, AbstractWalRecordsIterator.AbstractFileDescriptor { - /** file extension of WAL segment. */ private static final String WAL_SEGMENT_FILE_EXT = ".wal"; @@ -50,15 +48,17 @@ public class FileDescriptor implements Comparable<FileDescriptor>, AbstractWalRe * * @param file WAL segment file. */ - public FileDescriptor(@NotNull File file) { + public FileDescriptor(File file) { this(file, null); } /** + * Creates file descriptor. + * * @param file WAL segment file. * @param idx Absolute WAL segment file index. For null value index is restored from file name. */ - public FileDescriptor(@NotNull File file, @Nullable Long idx) { + public FileDescriptor(File file, @Nullable Long idx) { this.file = file; String fileName = file.getName(); @@ -69,13 +69,15 @@ public class FileDescriptor implements Comparable<FileDescriptor>, AbstractWalRe } /** - * @param segment Segment index. + * Getting segment file name. + * + * @param idx Segment index. * @return Segment file name. */ - public static String fileName(long segment) { + public static String fileName(long idx) { SB b = new SB(); - String segmentStr = Long.toString(segment); + String segmentStr = Long.toString(idx); for (int i = segmentStr.length(); i < WAL_SEGMENT_FILE_NAME_LENGTH; i++) b.a('0'); @@ -86,7 +88,7 @@ public class FileDescriptor implements Comparable<FileDescriptor>, AbstractWalRe } /** {@inheritDoc} */ - @Override public int compareTo(@NotNull FileDescriptor o) { + @Override public int compareTo(FileDescriptor o) { return Long.compare(idx, o.idx); } @@ -109,14 +111,18 @@ public class FileDescriptor implements Comparable<FileDescriptor>, AbstractWalRe } /** - * @return Absolute WAL segment file index + * Return absolute WAL segment file index. + * + * @return Absolute WAL segment file index. */ public long getIdx() { return idx; } /** - * @return absolute pathname string of this file descriptor pathname. + * Return absolute pathname string of this file descriptor pathname. + * + * @return Absolute pathname string of this file descriptor pathname. */ public String getAbsolutePath() { return file.getAbsolutePath(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index 70da8e9..a92168b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -44,6 +44,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongArray; @@ -86,7 +87,6 @@ import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabase import org.apache.ignite.internal.processors.cache.persistence.StorageException; import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; -import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings; import org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentAware; @@ -106,7 +106,6 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.Re import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer; -import org.apache.ignite.internal.processors.compress.CompressionProcessor; import org.apache.ignite.internal.processors.failure.FailureProcessor; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; @@ -127,7 +126,6 @@ import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.thread.IgniteThread; -import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import static java.nio.file.StandardOpenOption.CREATE; @@ -144,11 +142,14 @@ import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_COMPACTED; import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR; import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION; import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.TMP_SUFFIX; +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.ZIP_SUFFIX; import static org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor.fileName; import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory.LATEST_SERIALIZER_VERSION; import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.HEADER_RECORD_SIZE; import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.readPosition; import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.readSegmentHeader; +import static org.apache.ignite.internal.processors.compress.CompressionProcessor.checkCompressionLevelBounds; +import static org.apache.ignite.internal.processors.compress.CompressionProcessor.getDefaultCompressionLevel; /** * File WAL manager. @@ -262,7 +263,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** */ private final boolean alwaysWriteFullPages; - /** WAL segment size in bytes. . This is maximum value, actual segments may be shorter. */ + /** WAL segment size in bytes. This is maximum value, actual segments may be shorter. */ private final long maxWalSegmentSize; /** @@ -295,10 +296,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** Persistence metrics tracker. */ private DataStorageMetricsImpl metrics; - /** */ + /** WAL work directory (including consistent ID as subfolder). */ private File walWorkDir; - /** WAL archive directory (including consistent ID as subfolder) */ + /** WAL archive directory (including consistent ID as subfolder). */ private File walArchiveDir; /** Serializer of latest version, used to read header record and for write records */ @@ -317,7 +318,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** Holder of actual information of latest manipulation on WAL segments. */ private volatile SegmentAware segmentAware; - /** Updater for {@link #currHnd}, used for verify there are no concurrent update for current log segment handle */ + /** Updater for {@link #currHnd}, used for verify there are no concurrent update for current log segment handle. */ private static final AtomicReferenceFieldUpdater<FileWriteAheadLogManager, FileWriteHandle> CURR_HND_UPD = AtomicReferenceFieldUpdater.newUpdater(FileWriteAheadLogManager.class, FileWriteHandle.class, "currHnd"); @@ -328,10 +329,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl @Nullable private FileArchiver archiver; /** Compressor. */ - private FileCompressor compressor; + @Nullable private FileCompressor compressor; /** Decompressor. */ - private FileDecompressor decompressor; + @Nullable private FileDecompressor decompressor; /** Current log segment handle. */ private volatile FileWriteHandle currHnd; @@ -384,7 +385,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private final FileHandleManagerFactory fileHandleManagerFactory; /** Switch segment record offset. */ - private final AtomicLongArray switchSegmentRecordOffset; + @Nullable private final AtomicLongArray switchSegmentRecordOffset; /** Page snapshot records compression algorithm. */ private DiskPageCompression pageCompression; @@ -393,9 +394,16 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private int pageCompressionLevel; /** + * Local segment sizes: absolute segment index -> size in bytes. + * For segments from {@link #walWorkDir} and {@link #walArchiveDir}. + * If there is a raw and compressed segment, compressed size is getting. + */ + private final Map<Long, Long> segmentSize = new ConcurrentHashMap<>(); + + /** * @param ctx Kernal context. */ - public FileWriteAheadLogManager(@NotNull final GridKernalContext ctx) { + public FileWriteAheadLogManager(final GridKernalContext ctx) { igCfg = ctx.config(); DataStorageConfiguration dsCfg = igCfg.getDataStorageConfiguration(); @@ -467,8 +475,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl checkOrPrepareFiles(); - if (metrics != null) + if (metrics != null) { metrics.setWalSizeProvider(new CO<Long>() { + /** {@inheritDoc} */ @Override public Long apply() { long size = 0; @@ -481,6 +490,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl return size; } }); + } segmentAware = new SegmentAware(dsCfg.getWalSegments(), dsCfg.isWalCompactionEnabled()); @@ -520,8 +530,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl cctx.kernalContext().compress().checkPageCompressionSupported(); pageCompressionLevel = dsCfg.getWalPageCompressionLevel() != null ? - CompressionProcessor.checkCompressionLevelBounds(dsCfg.getWalPageCompressionLevel(), pageCompression) : - CompressionProcessor.getDefaultCompressionLevel(pageCompression); + checkCompressionLevelBounds(dsCfg.getWalPageCompressionLevel(), pageCompression) : + getDefaultCompressionLevel(pageCompression); } } } @@ -585,10 +595,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl List<File> res = new ArrayList<>(); for (long i = low.index(); i < high.index(); i++) { - String segmentName = FileDescriptor.fileName(i); + String segmentName = fileName(i); File file = new File(walArchiveDir, segmentName); - File fileZip = new File(walArchiveDir, segmentName + FilePageStoreManager.ZIP_SUFFIX); + File fileZip = new File(walArchiveDir, segmentName + ZIP_SUFFIX); if (file.exists()) res.add(file); @@ -640,7 +650,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl fileHandleManager.onDeactivate(); } catch (Exception e) { - U.error(log, "Failed to gracefully close WAL segment: " + this.currHnd, e); + U.error(log, "Failed to gracefully close WAL segment: " + currHnd, e); } segmentAware.interrupt(); @@ -691,13 +701,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** {@inheritDoc} */ @Override public void resumeLogging(WALPointer filePtr) throws IgniteCheckedException { - if (log.isDebugEnabled()) + if (log.isDebugEnabled()) { log.debug("File write ahead log manager resuming logging [nodeId=" + cctx.localNodeId() + " topVer=" + cctx.discovery().topologyVersionEx() + " ]"); + } - /* - walDisableContext is started after FileWriteAheadLogManager, so we obtain actual walDisableContext ref here. - */ + // walDisableContext is started after FileWriteAheadLogManager, so we obtain actual walDisableContext ref here. synchronized (this) { walDisableContext = cctx.walState().walDisableContext(); } @@ -711,17 +720,18 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl fileHandleManager.resumeLogging(); - currHnd = restoreWriteHandle(filePtr); + updateCurrentHandle(restoreWriteHandle(filePtr), null); // For new handle write serializer version to it. if (filePtr == null) currHnd.writeHeader(); if (currHnd.serializerVersion() != serializer.version()) { - if (log.isInfoEnabled()) + if (log.isInfoEnabled()) { log.info("Record serializer version change detected, will start logging with a new WAL record " + "serializer to a new WAL segment [curFile=" + currHnd + ", newVer=" + serializer.version() + ", oldVer=" + currHnd.serializerVersion() + ']'); + } rollOver(currHnd, null); } @@ -1010,9 +1020,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @return {@code true} if has this index. */ private boolean hasIndex(long absIdx) { - String segmentName = FileDescriptor.fileName(absIdx); + String segmentName = fileName(absIdx); - String zipSegmentName = FileDescriptor.fileName(absIdx) + FilePageStoreManager.ZIP_SUFFIX; + String zipSegmentName = segmentName + ZIP_SUFFIX; boolean inArchive = new File(walArchiveDir, segmentName).exists() || new File(walArchiveDir, zipSegmentName).exists(); @@ -1053,12 +1063,16 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl // We need to leave at least one archived segment to correctly determine the archive index. if (desc.idx < high.index() && desc.idx < lastArchived) { - if (!desc.file.delete()) + if (!desc.file.delete()) { U.warn(log, "Failed to remove obsolete WAL segment (make sure the process has enough rights): " + desc.file.getAbsolutePath()); - else + } + else { deleted++; + segmentSize.remove(desc.idx()); + } + // Bump up the oldest archive segment index. if (segmentAware.lastTruncatedArchiveIdx() < desc.idx) segmentAware.lastTruncatedArchiveIdx(desc.idx); @@ -1174,11 +1188,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @param file File to read. * @param ioFactory IO factory. */ - private FileDescriptor readFileDescriptor(File file, FileIOFactory ioFactory) { + @Nullable private FileDescriptor readFileDescriptor(File file, FileIOFactory ioFactory) { FileDescriptor ds = new FileDescriptor(file); try (SegmentIO fileIO = ds.toIO(ioFactory)) { - // File may be empty when LOG_ONLY mode is enabled and mmap is disabled + // File may be empty when LOG_ONLY mode is enabled and mmap is disabled. if (fileIO.size() == 0) return null; @@ -1283,9 +1297,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (next.getSegmentId() - lashCheckpointFileIdx() >= maxSegCountWithoutCheckpoint) cctx.database().forceCheckpoint("too big size of WAL without checkpoint"); - boolean swapped = CURR_HND_UPD.compareAndSet(this, hnd, next); - - assert swapped : "Concurrent updates on rollover are not allowed"; + assert updateCurrentHandle(next, hnd) : "Concurrent updates on rollover are not allowed"; if (walAutoArchiveAfterInactivity > 0) lastRecordLoggedMs.set(0); @@ -1313,14 +1325,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @return Initialized file write handle. * @throws StorageException If failed to initialize WAL write handle. */ - private FileWriteHandle restoreWriteHandle(WALPointer lastReadPtr) throws StorageException { + private FileWriteHandle restoreWriteHandle(@Nullable WALPointer lastReadPtr) throws StorageException { long absIdx = lastReadPtr == null ? 0 : lastReadPtr.index(); @Nullable FileArchiver archiver0 = archiver; long segNo = archiver0 == null ? absIdx : absIdx % dsCfg.getWalSegments(); - File curFile = new File(walWorkDir, FileDescriptor.fileName(segNo)); + File curFile = new File(walWorkDir, fileName(segNo)); int off = lastReadPtr == null ? 0 : lastReadPtr.fileOffset(); int len = lastReadPtr == null ? 0 : lastReadPtr.length(); @@ -1348,9 +1360,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl RecordSerializer ser = new RecordSerializerFactoryImpl(cctx).createSerializer(serVer); - if (log.isInfoEnabled()) + if (log.isInfoEnabled()) { log.info("Resuming logging to WAL segment [file=" + curFile.getAbsolutePath() + ", offset=" + off + ", ver=" + serVer + ']'); + } FileWriteHandle hnd = fileHandleManager.initHandle(fileIO, off + len, ser); @@ -1359,6 +1372,24 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl else segmentAware.setLastArchivedAbsoluteIndex(absIdx - 1); + // Getting segment sizes. + F.asList(walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)).stream() + .map(FileDescriptor::new) + .forEach(fd -> { + if (fd.isCompressed()) + segmentSize.put(fd.idx(), fd.file().length()); + else + segmentSize.putIfAbsent(fd.idx(), fd.file().length()); + }); + + // If walArchiveDir != walWorkDir, then need to get size of all segments that were not in archive. + // For example, absIdx == 8, and there are 0-4 segments in archive, then we need to get sizes of 5-7 segments. + // Size of the 8th segment will be set in #resumeLogging. + if (archiver0 != null) { + for (long i = absIdx - (absIdx % dsCfg.getWalSegments()); i < absIdx; i++) + segmentSize.putIfAbsent(i, maxWalSegmentSize); + } + return hnd; } catch (IgniteCheckedException | IOException e) { @@ -1467,25 +1498,24 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (!F.isEmpty(tmpFiles)) { for (File tmp : tmpFiles) { - boolean deleted = tmp.delete(); - - if (!deleted) + if (!tmp.delete()) { throw new StorageException("Failed to delete previously created temp file " + "(make sure Ignite process has enough rights): " + tmp.getAbsolutePath()); + } } } } File[] allFiles = walWorkDir.listFiles(WAL_SEGMENT_FILE_FILTER); - if (isArchiverEnabled()) - if (allFiles.length != 0 && allFiles.length > dsCfg.getWalSegments()) - throw new StorageException("Failed to initialize wal (work directory contains " + - "incorrect number of segments) [cur=" + allFiles.length + ", expected=" + dsCfg.getWalSegments() + ']'); + if (isArchiverEnabled() && !F.isEmpty(allFiles) && allFiles.length > dsCfg.getWalSegments()) { + throw new StorageException("Failed to initialize wal (work directory contains incorrect " + + "number of segments) [cur=" + allFiles.length + ", expected=" + dsCfg.getWalSegments() + ']'); + } // Allocate the first segment synchronously. All other segments will be allocated by archiver in background. - if (allFiles.length == 0) { - File first = new File(walWorkDir, FileDescriptor.fileName(0)); + if (F.isEmpty(allFiles)) { + File first = new File(walWorkDir, fileName(0)); createFile(first); } @@ -1575,7 +1605,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (archiver0 == null) { segmentAware.setLastArchivedAbsoluteIndex(curIdx); - return new File(walWorkDir, FileDescriptor.fileName(curIdx + 1)); + return new File(walWorkDir, fileName(curIdx + 1)); } long absNextIdxStartTime = System.nanoTime(); @@ -1598,7 +1628,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl long segmentIdx = absNextIdx % dsCfg.getWalSegments(); - return new File(walWorkDir, FileDescriptor.fileName(segmentIdx)); + return new File(walWorkDir, fileName(segmentIdx)); } /** @@ -1638,7 +1668,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** * @return Sorted WAL files descriptors. */ - public static FileDescriptor[] scan(File[] allFiles) { + public static FileDescriptor[] scan(@Nullable File[] allFiles) { if (allFiles == null) return EMPTY_DESCRIPTORS; @@ -1701,7 +1731,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private int formatted; /** + * Constructor. * + * @param segmentAware Segment aware. + * @param log Logger. */ private FileArchiver(SegmentAware segmentAware, IgniteLogger log) throws IgniteCheckedException { super(cctx.igniteInstanceName(), "wal-file-archiver%" + cctx.igniteInstanceName(), log, @@ -1711,6 +1744,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** + * Initialization. + * * @param segmentAware Segment aware. * @throws IgniteCheckedException If initialization failed. */ @@ -1737,13 +1772,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl for (File file : walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)) { try { - long idx = Long.parseLong(file.getName().substring(0, 16)); + long idx = new FileDescriptor(file).idx(); FileDescriptor desc = readFileDescriptor(file, ioFactory); if (desc != null) { if (desc.idx() == idx) - archiveIndices.put(desc.idx(), desc); + archiveIndices.put(idx, desc); } else log.warning("Skip file, failed read file header " + file); @@ -1762,7 +1797,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl // Try to find min and max if we have skipped range semgnets in archive. Find firs gap. for (Long idx : archiveIndices.descendingKeySet()) { - if (!archiveIndices.keySet().contains(idx - 1)) + if (!archiveIndices.containsKey(idx - 1)) return F.t(idx, max); } @@ -1964,41 +1999,41 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** - * Moves WAL segment from work folder to archive folder. Temp file is used to do movement + * Moves WAL segment from work folder to archive folder. Temp file is used to do movement. * * @param absIdx Absolute index to archive. + * @throws StorageException If failed. */ public SegmentArchiveResult archiveSegment(long absIdx) throws StorageException { long segIdx = absIdx % dsCfg.getWalSegments(); - File origFile = new File(walWorkDir, FileDescriptor.fileName(segIdx)); + File origFile = new File(walWorkDir, fileName(segIdx)); - String name = FileDescriptor.fileName(absIdx); + String name = fileName(absIdx); File dstTmpFile = new File(walArchiveDir, name + TMP_SUFFIX); File dstFile = new File(walArchiveDir, name); - if (log.isInfoEnabled()) + if (log.isInfoEnabled()) { log.info("Starting to copy WAL segment [absIdx=" + absIdx + ", segIdx=" + segIdx + ", origFile=" + origFile.getAbsolutePath() + ", dstFile=" + dstFile.getAbsolutePath() + ']'); + } try { Files.deleteIfExists(dstTmpFile.toPath()); boolean copied = false; - if (switchSegmentRecordOffset != null) { - long offs = switchSegmentRecordOffset.get((int)segIdx); + long offs = switchSegmentRecordOffset.get((int)segIdx); - if (offs > 0) { - switchSegmentRecordOffset.set((int)segIdx, 0); + if (offs > 0) { + switchSegmentRecordOffset.set((int)segIdx, 0); - if (offs < origFile.length()) { - GridFileUtils.copy(ioFactory, origFile, ioFactory, dstTmpFile, offs); + if (offs < origFile.length()) { + GridFileUtils.copy(ioFactory, origFile, ioFactory, dstTmpFile, offs); - copied = true; - } + copied = true; } } @@ -2012,6 +2047,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl f0.force(); } } + + segmentSize.put(absIdx, dstFile.length()); } catch (IOException e) { throw new StorageException("Failed to archive WAL segment [" + @@ -2019,9 +2056,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl ", dstFile=" + dstTmpFile.getAbsolutePath() + ']', e); } - if (log.isInfoEnabled()) + if (log.isInfoEnabled()) { log.info("Copied file [src=" + origFile.getAbsolutePath() + ", dst=" + dstFile.getAbsolutePath() + ']'); + } return new SegmentArchiveResult(absIdx, origFile, dstFile); } @@ -2078,7 +2116,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** Workers queue. */ private final List<FileCompressorWorker> workers = new ArrayList<>(); - /** */ + /** + * Constructor. + * + * @param log Logger. + */ FileCompressor(IgniteLogger log) { super(0, log); @@ -2215,12 +2257,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl deleteObsoleteRawSegments(); - File tmpZip = new File(walArchiveDir, FileDescriptor.fileName(segIdx) - + FilePageStoreManager.ZIP_SUFFIX + TMP_SUFFIX); + String segmentFileName = fileName(segIdx); - File zip = new File(walArchiveDir, FileDescriptor.fileName(segIdx) + FilePageStoreManager.ZIP_SUFFIX); + File tmpZip = new File(walArchiveDir, segmentFileName + ZIP_SUFFIX + TMP_SUFFIX); - File raw = new File(walArchiveDir, FileDescriptor.fileName(segIdx)); + File zip = new File(walArchiveDir, segmentFileName + ZIP_SUFFIX); + + File raw = new File(walArchiveDir, segmentFileName); if (!Files.exists(raw.toPath())) throw new IgniteCheckedException("WAL archive segment is missing: " + raw); @@ -2235,13 +2278,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl segmentAware.onSegmentCompressed(segIdx); - if (evt.isRecordable(EVT_WAL_SEGMENT_COMPACTED) && !cctx.kernalContext().recoveryMode()) { - evt.record(new WalSegmentCompactedEvent( - cctx.localNode(), - segIdx, - zip.getAbsoluteFile()) - ); - } + if (evt.isRecordable(EVT_WAL_SEGMENT_COMPACTED) && !cctx.kernalContext().recoveryMode()) + evt.record(new WalSegmentCompactedEvent(cctx.localNode(), segIdx, zip.getAbsoluteFile())); } catch (IgniteInterruptedCheckedException ignore) { Thread.currentThread().interrupt(); @@ -2250,7 +2288,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl lastCompressionError = e; U.error(log, "Compression of WAL segment [idx=" + segIdx + - "] was skipped due to unexpected error", lastCompressionError); + "] was skipped due to unexpected error", lastCompressionError); segmentAware.onSegmentCompressed(segIdx); } @@ -2262,26 +2300,30 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** - * @param nextSegment Next segment absolute idx. - * @param raw Raw file. - * @param zip Zip file. + * Segment compression. + * + * @param idx Segment absolute index. + * @param raw Raw segment file. + * @param zip Zip file to writing. + * @throws IOException If failed. + * @throws IgniteCheckedException If failed. */ - private void compressSegmentToFile(long nextSegment, File raw, File zip) - throws IOException, IgniteCheckedException { - int segmentSerializerVer; + private void compressSegmentToFile(long idx, File raw, File zip) throws IOException, IgniteCheckedException { + int serializerVer; try (FileIO fileIO = ioFactory.create(raw)) { - segmentSerializerVer = readSegmentHeader(new SegmentIO(nextSegment, fileIO), segmentFileInputFactory).getSerializerVersion(); + serializerVer = readSegmentHeader(new SegmentIO(idx, fileIO), segmentFileInputFactory) + .getSerializerVersion(); } try (ZipOutputStream zos = new ZipOutputStream(new BufferedOutputStream(new FileOutputStream(zip)))) { zos.setLevel(dsCfg.getWalCompactionLevel()); - zos.putNextEntry(new ZipEntry(nextSegment + ".wal")); + zos.putNextEntry(new ZipEntry(idx + ".wal")); ByteBuffer buf = ByteBuffer.allocate(HEADER_RECORD_SIZE); buf.order(ByteOrder.nativeOrder()); - zos.write(prepareSerializerVersionBuffer(nextSegment, segmentSerializerVer, true, buf).array()); + zos.write(prepareSerializerVersionBuffer(idx, serializerVer, true, buf).array()); final CIX1<WALRecord> appendToZipC = new CIX1<WALRecord>() { @Override public void applyx(WALRecord record) throws IgniteCheckedException { @@ -2297,32 +2339,36 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl }; try (SingleSegmentLogicalRecordsIterator iter = new SingleSegmentLogicalRecordsIterator( - log, cctx, ioFactory, BUF_SIZE, nextSegment, walArchiveDir, appendToZipC)) { + log, cctx, ioFactory, BUF_SIZE, idx, walArchiveDir, appendToZipC)) { while (iter.hasNextX()) iter.nextX(); } - RecordSerializer ser = new RecordSerializerFactoryImpl(cctx).createSerializer(segmentSerializerVer); + RecordSerializer ser = new RecordSerializerFactoryImpl(cctx).createSerializer(serializerVer); - ByteBuffer heapBuf = prepareSwitchSegmentRecordBuffer(nextSegment, ser); + ByteBuffer heapBuf = prepareSwitchSegmentRecordBuffer(idx, ser); zos.write(heapBuf.array()); } + + segmentSize.put(idx, zip.length()); } /** - * @param nextSegment Segment index. + * @param idx Segment index. * @param ser Record Serializer. */ - @NotNull private ByteBuffer prepareSwitchSegmentRecordBuffer(long nextSegment, RecordSerializer ser) - throws IgniteCheckedException { + private ByteBuffer prepareSwitchSegmentRecordBuffer( + long idx, + RecordSerializer ser + ) throws IgniteCheckedException { SwitchSegmentRecord switchRecord = new SwitchSegmentRecord(); int switchRecordSize = ser.size(switchRecord); switchRecord.size(switchRecordSize); - switchRecord.position(new WALPointer(nextSegment, 0, switchRecordSize)); + switchRecord.position(new WALPointer(idx, 0, switchRecordSize)); ByteBuffer heapBuf = ByteBuffer.allocate(switchRecordSize); @@ -2353,9 +2399,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl return; if (desc.idx < segmentAware.keepUncompressedIdxFrom() && duplicateIndices.contains(desc.idx)) { - if (desc.file.exists() && !desc.file.delete()) - U.warn(log, "Failed to remove obsolete WAL segment (make sure the process has enough rights): " + - desc.file.getAbsolutePath() + ", exists: " + desc.file.exists()); + if (desc.file.exists() && !desc.file.delete()) { + U.warn(log, "Failed to remove obsolete WAL segment " + + "(make sure the process has enough rights): " + desc.file.getAbsolutePath() + + ", exists: " + desc.file.exists()); + } } } } @@ -2403,11 +2451,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (isCancelled()) break; - File zip = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress) - + FilePageStoreManager.ZIP_SUFFIX); - File unzipTmp = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress) - + TMP_SUFFIX); - File unzip = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress)); + String segmentFileName = fileName(segmentToDecompress); + + File zip = new File(walArchiveDir, segmentFileName + ZIP_SUFFIX); + File unzipTmp = new File(walArchiveDir, segmentFileName + TMP_SUFFIX); + File unzip = new File(walArchiveDir, segmentFileName); try (ZipInputStream zis = new ZipInputStream(new BufferedInputStream(new FileInputStream(zip))); FileIO io = ioFactory.create(unzipTmp)) { @@ -2475,7 +2523,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (decompressionFutures.containsKey(idx)) return decompressionFutures.get(idx); - File f = new File(walArchiveDir, FileDescriptor.fileName(idx)); + File f = new File(walArchiveDir, fileName(idx)); if (f.exists()) return new GridFinishedFuture<>(); @@ -2518,33 +2566,36 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @param startWith Start with. * @param create Flag create file. * @param p Predicate Exit condition. + * @param completionCb Callback after verification segment. * @throws StorageException if validation or create file fail. */ private void checkFiles( int startWith, boolean create, @Nullable IgnitePredicate<Integer> p, - @Nullable IgniteInClosure<Integer> completionCallback + @Nullable IgniteInClosure<Integer> completionCb ) throws StorageException { for (int i = startWith; i < dsCfg.getWalSegments() && (p == null || p.apply(i)); i++) { - File checkFile = new File(walWorkDir, FileDescriptor.fileName(i)); + File checkFile = new File(walWorkDir, fileName(i)); if (checkFile.exists()) { - if (checkFile.isDirectory()) + if (checkFile.isDirectory()) { throw new StorageException("Failed to initialize WAL log segment (a directory with " + "the same name already exists): " + checkFile.getAbsolutePath()); - else if (checkFile.length() != dsCfg.getWalSegmentSize() && mode == WALMode.FSYNC) + } + else if (checkFile.length() != dsCfg.getWalSegmentSize() && mode == WALMode.FSYNC) { throw new StorageException("Failed to initialize WAL log segment " + "(WAL segment size change is not supported in 'DEFAULT' WAL mode) " + "[filePath=" + checkFile.getAbsolutePath() + ", fileSize=" + checkFile.length() + ", configSize=" + dsCfg.getWalSegmentSize() + ']'); + } } else if (create) createFile(checkFile); - if (completionCallback != null) - completionCallback.apply(i); + if (completionCb != null) + completionCb.apply(i); } } @@ -2555,7 +2606,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @param ver Version. * @param compacted Compacted flag. */ - @NotNull public static ByteBuffer prepareSerializerVersionBuffer(long idx, int ver, boolean compacted, ByteBuffer buf) { + public static ByteBuffer prepareSerializerVersionBuffer(long idx, int ver, boolean compacted, ByteBuffer buf) { // Write record type. buf.put((byte) (WALRecord.RecordType.HEADER_RECORD.ordinal() + 1)); @@ -2712,7 +2763,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl @Nullable WALPointer start, @Nullable WALPointer end, DataStorageConfiguration dsCfg, - @NotNull RecordSerializerFactory serializerFactory, + RecordSerializerFactory serializerFactory, FileIOFactory ioFactory, @Nullable FileArchiver archiver, FileDecompressor decompressor, @@ -2742,15 +2793,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** {@inheritDoc} */ @Override protected ReadFileHandle initReadHandle( - @NotNull AbstractFileDescriptor desc, + AbstractFileDescriptor desc, @Nullable WALPointer start ) throws IgniteCheckedException, FileNotFoundException { AbstractFileDescriptor currDesc = desc; if (!desc.file().exists()) { FileDescriptor zipFile = new FileDescriptor( - new File(walArchiveDir, FileDescriptor.fileName(desc.idx()) - + FilePageStoreManager.ZIP_SUFFIX)); + new File(walArchiveDir, fileName(desc.idx()) + ZIP_SUFFIX)); if (!zipFile.file.exists()) { throw new FileNotFoundException("Both compressed and raw segment files are missing in archive " + @@ -2902,10 +2952,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** {@inheritDoc} */ - @Override protected IgniteCheckedException handleRecordException( - @NotNull Exception e, - @Nullable WALPointer ptr) { - + @Override protected IgniteCheckedException handleRecordException(Exception e, @Nullable WALPointer ptr) { if (e instanceof IgniteCheckedException) if (X.hasCause(e, IgniteDataIntegrityViolationException.class)) // This means that there is no explicit last sengment, so we iterate unil the very end. @@ -2971,12 +3018,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private boolean canIgnoreCrcError( long workIdx, long walSegmentIdx, - @NotNull Exception e, - @Nullable WALPointer ptr) { - FileDescriptor fd = new FileDescriptor( - new File(walWorkDir, FileDescriptor.fileName(workIdx)), - walSegmentIdx - ); + Exception e, + @Nullable WALPointer ptr + ) { + FileDescriptor fd = new FileDescriptor(new File(walWorkDir, fileName(workIdx)), walSegmentIdx); try { if (!fd.file().exists()) @@ -3023,7 +3068,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @param walFilesDir directory to scan * @return found WAL file descriptors */ - public static FileDescriptor[] loadFileDescriptors(@NotNull final File walFilesDir) throws IgniteCheckedException { + public static FileDescriptor[] loadFileDescriptors(final File walFilesDir) throws IgniteCheckedException { final File[] files = walFilesDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER); if (files == null) { @@ -3032,4 +3077,45 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } return scan(files); } + + /** {@inheritDoc} */ + @Override public long segmentSize(long idx) { + return segmentSize.getOrDefault(idx, 0L); + } + + /** {@inheritDoc} */ + @Override public WALPointer lastWritePointer() { + return currHnd.position(); + } + + /** + * Concurrent {@link #currHnd} update. + * + * @param n New handle. + * @param c Current handle, if not {@code null} CAS will be used. + * @return {@code True} if updated. + */ + private boolean updateCurrentHandle(FileWriteHandle n, @Nullable FileWriteHandle c) { + boolean res = true; + + if (c == null) + currHnd = n; + else + res = CURR_HND_UPD.compareAndSet(this, c, n); + + segmentSize.put(n.getSegmentId(), maxWalSegmentSize); + + return res; + } + + /** + * Check that file name matches segment name. + * + * @param name File name. + * @return {@code True} if file name matches segment name. + */ + public static boolean isSegmentFileName(@Nullable String name) { + return name != null && (WAL_NAME_PATTERN.matcher(name).matches() || + WAL_SEGMENT_FILE_COMPACTED_PATTERN.matcher(name).matches()); + } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteLocalWalSizeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteLocalWalSizeTest.java new file mode 100644 index 0000000..2854a2a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteLocalWalSizeTest.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.db.wal; + +import java.io.File; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Consumer; +import java.util.stream.IntStream; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; +import org.apache.ignite.internal.processors.cache.persistence.wal.filehandle.FileWriteHandle; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.Nullable; +import org.junit.Test; + +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.ZIP_SUFFIX; +import static org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor.fileName; +import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER; +import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.isSegmentFileName; +import static org.apache.ignite.testframework.GridTestUtils.getFieldValue; + +/** + * Class for testing local size of WAL. + */ +public class IgniteLocalWalSizeTest extends GridCommonAbstractTest { + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + stopAllGrids(); + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + return super.getConfiguration(gridName) + .setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME)) + .setDataStorageConfiguration( + new DataStorageConfiguration() + .setWalSegments(5) + .setWalSegmentSize((int)U.MB) + .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true)) + ); + } + + /** + * Checking correctness of working with local segment sizes for case: archiving only. + * + * @throws Exception If failed. + */ + @Test + public void testLocalSegmentSizesArchiveOnly() throws Exception { + checkLocalSegmentSizesForOneNode(null); + } + + /** + * Checking correctness of working with local segment sizes for case: archiving and compression. + * + * @throws Exception If failed. + */ + @Test + public void testLocalSegmentSizesArchiveAndCompression() throws Exception { + checkLocalSegmentSizesForOneNode(cfg -> cfg.getDataStorageConfiguration().setWalCompactionEnabled(true)); + } + + /** + * Checking correctness of working with local segment sizes for case: without archiving. + * + * @throws Exception If failed. + */ + @Test + public void testLocalSegmentSizesWithoutArchive() throws Exception { + checkLocalSegmentSizesForOneNode(cfg -> { + DataStorageConfiguration dsCfg = cfg.getDataStorageConfiguration(); + dsCfg.setWalArchivePath(dsCfg.getWalPath()); + }); + } + + /** + * Checking correctness of working with local segment sizes for case: without archiving and with compression. + * + * @throws Exception If failed. + */ + @Test + public void testLocalSegmentSizesWithoutArchiveWithCompression() throws Exception { + checkLocalSegmentSizesForOneNode(cfg -> { + DataStorageConfiguration dsCfg = cfg.getDataStorageConfiguration(); + dsCfg.setWalArchivePath(dsCfg.getWalPath()).setWalCompactionEnabled(true); + }); + } + + /** + * Checking whether segment file name is checked correctly. + * + * @throws Exception If failed. + */ + @Test + public void testSegmentFileName() throws Exception { + Arrays.asList(null, "", "1", "wal", fileName(0) + "1", fileName(1).replace(".wal", ".wa")) + .forEach(s -> assertFalse(s, isSegmentFileName(s))); + + IntStream.range(0, 10) + .mapToObj(FileDescriptor::fileName) + .forEach(fn -> assertTrue(fn, isSegmentFileName(fn) && isSegmentFileName(fn + ZIP_SUFFIX))); + } + + /** + * Checks whether local segment sizes are working correctly for a single node after loading and restarting. + * + * @param cfgUpdater Configuration updater. + * @throws Exception If failed. + */ + private void checkLocalSegmentSizesForOneNode( + @Nullable Consumer<IgniteConfiguration> cfgUpdater + ) throws Exception { + IgniteConfiguration cfg = getConfiguration(getTestIgniteInstanceName(0)); + + if (cfgUpdater != null) + cfgUpdater.accept(cfg); + + IgniteEx n = startGrid(cfg); + n.cluster().state(ClusterState.ACTIVE); + + awaitPartitionMapExchange(); + + IgniteCache<Object, Object> c = n.getOrCreateCache(DEFAULT_CACHE_NAME); + IntStream.range(0, 10_000).forEach(i -> c.put(i, i)); + + forceCheckpoint(); + checkLocalSegmentSizes(n); + + stopGrid(cfg.getIgniteInstanceName()); + awaitPartitionMapExchange(); + + cfg = getConfiguration(cfg.getIgniteInstanceName()); + + if (cfgUpdater != null) + cfgUpdater.accept(cfg); + + // To avoid a race between compressor and getting the segment sizes. + if (cfg.getDataStorageConfiguration().isWalCompactionEnabled()) + cfg.getDataStorageConfiguration().setWalCompactionEnabled(false); + + n = startGrid(cfg); + awaitPartitionMapExchange(); + + checkLocalSegmentSizes(n); + } + + /** + * Check that local segment sizes in the memory and actual match. + * + * @param n Node. + */ + private void checkLocalSegmentSizes(IgniteEx n) { + FileWriteAheadLogManager wal = (FileWriteAheadLogManager)n.context().cache().context().wal(); + + File walWorkDir = getFieldValue(wal, "walWorkDir"); + File walArchiveDir = getFieldValue(wal, "walArchiveDir"); + + Map<Long, Long> expSegmentSize = new HashMap<>(); + + F.asList(walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)) + .stream() + .map(FileDescriptor::new) + .forEach(fd -> { + if (fd.isCompressed()) + expSegmentSize.put(fd.idx(), fd.file().length()); + else + expSegmentSize.putIfAbsent(fd.idx(), fd.file().length()); + }); + + FileWriteHandle currHnd = getFieldValue(wal, "currHnd"); + + if (!walArchiveDir.equals(walWorkDir)) { + long absIdx = currHnd.getSegmentId(); + int segments = n.configuration().getDataStorageConfiguration().getWalSegments(); + + for (long i = absIdx - (absIdx % segments); i <= absIdx; i++) + expSegmentSize.putIfAbsent(i, new File(walWorkDir, fileName(i % segments)).length()); + } + + assertEquals(currHnd.getSegmentId() + 1, expSegmentSize.size()); + + Map<Long, Long> segmentSize = getFieldValue(wal, "segmentSize"); + assertEquals(expSegmentSize.size(), segmentSize.size()); + + expSegmentSize.forEach((idx, size) -> { + assertEquals(idx.toString(), size, segmentSize.get(idx)); + assertEquals(idx.toString(), size.longValue(), wal.segmentSize(idx)); + }); + + assertEquals(0, wal.segmentSize(currHnd.getSegmentId() + 1)); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java index 3dedd8f..2e78ad0 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java @@ -188,4 +188,14 @@ public class NoOpWALManager implements IgniteWriteAheadLogManager { @Override public long maxArchivedSegmentToDelete() { return -1; } + + /** {@inheritDoc} */ + @Override public long segmentSize(long idx) { + return -1; + } + + /** {@inheritDoc} */ + @Override public WALPointer lastWritePointer() { + return null; + } } diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java index 8ff45e4..11b9ba5 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java @@ -56,6 +56,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.checkpoint.Che import org.apache.ignite.internal.processors.cache.persistence.db.checkpoint.IgniteCheckpointDirtyPagesForLowLoadTest; import org.apache.ignite.internal.processors.cache.persistence.db.filename.IgniteUidAsConsistentIdMigrationTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.FsyncWalRolloverDoesNotBlockTest; +import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteLocalWalSizeTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteNodeStoppedDuringDisableWALTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWALTailIsReachedDuringIterationOverArchiveTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalFlushBackgroundSelfTest; @@ -231,5 +232,7 @@ public class IgnitePdsTestSuite2 { GridTestUtils.addTestIfNeeded(suite, IgniteWalRebalanceLoggingTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, HistoricalRebalanceHeuristicsTest.class, ignoredTests); + + GridTestUtils.addTestIfNeeded(suite, IgniteLocalWalSizeTest.class, ignoredTests); } }