This is an automated email from the ASF dual-hosted git repository.

sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 0abf6fa  IGNITE-13613 API to get full WAL size and implementation to 
track WAL segments rollover and compression processes - Fixes #8388.
0abf6fa is described below

commit 0abf6fafe77ec447f2bfea1d7583a2b85b2ab192
Author: ktkalenko <ktkale...@gridgain.com>
AuthorDate: Fri Oct 30 09:51:55 2020 +0300

    IGNITE-13613 API to get full WAL size and implementation to track WAL 
segments rollover and compression processes - Fixes #8388.
    
    Signed-off-by: Sergey Chugunov <sergey.chugu...@gmail.com>
---
 .../internal/dto/IgniteDataTransferObject.java     |   6 +
 .../pagemem/wal/IgniteWriteAheadLogManager.java    |  15 +
 .../processors/cache/GridCacheSharedContext.java   |   2 +-
 .../cache/persistence/wal/FileDescriptor.java      |  26 +-
 .../persistence/wal/FileWriteAheadLogManager.java  | 332 +++++++++++++--------
 .../persistence/db/wal/IgniteLocalWalSizeTest.java | 229 ++++++++++++++
 .../cache/persistence/pagemem/NoOpWALManager.java  |  10 +
 .../ignite/testsuites/IgnitePdsTestSuite2.java     |   3 +
 8 files changed, 489 insertions(+), 134 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/dto/IgniteDataTransferObject.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/dto/IgniteDataTransferObject.java
index 4d735d0..279586d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/dto/IgniteDataTransferObject.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/dto/IgniteDataTransferObject.java
@@ -59,6 +59,12 @@ public abstract class IgniteDataTransferObject implements 
Externalizable {
     /** Version 7. */
     protected static final byte V7 = 7;
 
+    /** Version 8. */
+    protected static final byte V8 = 8;
+
+    /** Version 9. */
+    protected static final byte V9 = 9;
+
     /**
      * @param col Source collection.
      * @param <T> Collection type.
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
index cc183bf..cb4fc30 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
@@ -214,4 +214,19 @@ public interface IgniteWriteAheadLogManager extends 
GridCacheSharedManager, Igni
      * @param grpId Group id.
      */
     public boolean disabled(int grpId);
+
+    /**
+     * Getting local WAL segment size.
+     *
+     * @param idx Absolute segment index.
+     * @return Segment size, {@code 0} if size is unknown.
+     */
+    long segmentSize(long idx);
+
+    /**
+     * Get last written pointer.
+     *
+     * @return Last written pointer.
+     */
+    WALPointer lastWritePointer();
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
index c1981c6..f40d4d7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheSharedContext.java
@@ -760,7 +760,7 @@ public class GridCacheSharedContext<K, V> {
     /**
      * @return Write ahead log manager.
      */
-    public IgniteWriteAheadLogManager wal() {
+    @Nullable public IgniteWriteAheadLogManager wal() {
         return walMgr;
     }
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileDescriptor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileDescriptor.java
index f265376..2f088d1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileDescriptor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileDescriptor.java
@@ -25,14 +25,12 @@ import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStor
 import 
org.apache.ignite.internal.processors.cache.persistence.file.UnzipFileIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO;
 import org.apache.ignite.internal.util.typedef.internal.SB;
-import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * WAL file descriptor.
  */
 public class FileDescriptor implements Comparable<FileDescriptor>, 
AbstractWalRecordsIterator.AbstractFileDescriptor {
-
     /** file extension of WAL segment. */
     private static final String WAL_SEGMENT_FILE_EXT = ".wal";
 
@@ -50,15 +48,17 @@ public class FileDescriptor implements 
Comparable<FileDescriptor>, AbstractWalRe
      *
      * @param file WAL segment file.
      */
-    public FileDescriptor(@NotNull File file) {
+    public FileDescriptor(File file) {
         this(file, null);
     }
 
     /**
+     *  Creates file descriptor.
+     *
      * @param file WAL segment file.
      * @param idx Absolute WAL segment file index. For null value index is 
restored from file name.
      */
-    public FileDescriptor(@NotNull File file, @Nullable Long idx) {
+    public FileDescriptor(File file, @Nullable Long idx) {
         this.file = file;
 
         String fileName = file.getName();
@@ -69,13 +69,15 @@ public class FileDescriptor implements 
Comparable<FileDescriptor>, AbstractWalRe
     }
 
     /**
-     * @param segment Segment index.
+     * Getting segment file name.
+     *
+     * @param idx Segment index.
      * @return Segment file name.
      */
-    public static String fileName(long segment) {
+    public static String fileName(long idx) {
         SB b = new SB();
 
-        String segmentStr = Long.toString(segment);
+        String segmentStr = Long.toString(idx);
 
         for (int i = segmentStr.length(); i < WAL_SEGMENT_FILE_NAME_LENGTH; 
i++)
             b.a('0');
@@ -86,7 +88,7 @@ public class FileDescriptor implements 
Comparable<FileDescriptor>, AbstractWalRe
     }
 
     /** {@inheritDoc} */
-    @Override public int compareTo(@NotNull FileDescriptor o) {
+    @Override public int compareTo(FileDescriptor o) {
         return Long.compare(idx, o.idx);
     }
 
@@ -109,14 +111,18 @@ public class FileDescriptor implements 
Comparable<FileDescriptor>, AbstractWalRe
     }
 
     /**
-     * @return Absolute WAL segment file index
+     * Return absolute WAL segment file index.
+     *
+     * @return Absolute WAL segment file index.
      */
     public long getIdx() {
         return idx;
     }
 
     /**
-     * @return absolute pathname string of this file descriptor pathname.
+     * Return absolute pathname string of this file descriptor pathname.
+     *
+     * @return Absolute pathname string of this file descriptor pathname.
      */
     public String getAbsolutePath() {
         return file.getAbsolutePath();
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 70da8e9..a92168b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -44,6 +44,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicLongArray;
@@ -86,7 +87,6 @@ import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabase
 import 
org.apache.ignite.internal.processors.cache.persistence.StorageException;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
-import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
 import 
org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentAware;
@@ -106,7 +106,6 @@ import 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.Re
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer;
-import org.apache.ignite.internal.processors.compress.CompressionProcessor;
 import org.apache.ignite.internal.processors.failure.FailureProcessor;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
@@ -127,7 +126,6 @@ import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.thread.IgniteThread;
-import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
 import static java.nio.file.StandardOpenOption.CREATE;
@@ -144,11 +142,14 @@ import static 
org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_COMPACTED;
 import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
 import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
 import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.TMP_SUFFIX;
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.ZIP_SUFFIX;
 import static 
org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor.fileName;
 import static 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory.LATEST_SERIALIZER_VERSION;
 import static 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.HEADER_RECORD_SIZE;
 import static 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.readPosition;
 import static 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.readSegmentHeader;
+import static 
org.apache.ignite.internal.processors.compress.CompressionProcessor.checkCompressionLevelBounds;
+import static 
org.apache.ignite.internal.processors.compress.CompressionProcessor.getDefaultCompressionLevel;
 
 /**
  * File WAL manager.
@@ -262,7 +263,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
     /** */
     private final boolean alwaysWriteFullPages;
 
-    /** WAL segment size in bytes. . This is maximum value, actual segments 
may be shorter. */
+    /** WAL segment size in bytes. This is maximum value, actual segments may 
be shorter. */
     private final long maxWalSegmentSize;
 
     /**
@@ -295,10 +296,10 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
     /** Persistence metrics tracker. */
     private DataStorageMetricsImpl metrics;
 
-    /** */
+    /** WAL work directory (including consistent ID as subfolder). */
     private File walWorkDir;
 
-    /** WAL archive directory (including consistent ID as subfolder) */
+    /** WAL archive directory (including consistent ID as subfolder). */
     private File walArchiveDir;
 
     /** Serializer of latest version, used to read header record and for write 
records */
@@ -317,7 +318,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
     /** Holder of actual information of latest manipulation on WAL segments. */
     private volatile SegmentAware segmentAware;
 
-    /** Updater for {@link #currHnd}, used for verify there are no concurrent 
update for current log segment handle */
+    /** Updater for {@link #currHnd}, used for verify there are no concurrent 
update for current log segment handle. */
     private static final AtomicReferenceFieldUpdater<FileWriteAheadLogManager, 
FileWriteHandle> CURR_HND_UPD =
         AtomicReferenceFieldUpdater.newUpdater(FileWriteAheadLogManager.class, 
FileWriteHandle.class, "currHnd");
 
@@ -328,10 +329,10 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
     @Nullable private FileArchiver archiver;
 
     /** Compressor. */
-    private FileCompressor compressor;
+    @Nullable private FileCompressor compressor;
 
     /** Decompressor. */
-    private FileDecompressor decompressor;
+    @Nullable private FileDecompressor decompressor;
 
     /** Current log segment handle. */
     private volatile FileWriteHandle currHnd;
@@ -384,7 +385,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
     private final FileHandleManagerFactory fileHandleManagerFactory;
 
     /** Switch segment record offset. */
-    private final AtomicLongArray switchSegmentRecordOffset;
+    @Nullable private final AtomicLongArray switchSegmentRecordOffset;
 
     /** Page snapshot records compression algorithm. */
     private DiskPageCompression pageCompression;
@@ -393,9 +394,16 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
     private int pageCompressionLevel;
 
     /**
+     * Local segment sizes: absolute segment index -> size in bytes.
+     * For segments from {@link #walWorkDir} and {@link #walArchiveDir}.
+     * If there is a raw and compressed segment, compressed size is getting.
+     */
+    private final Map<Long, Long> segmentSize = new ConcurrentHashMap<>();
+
+    /**
      * @param ctx Kernal context.
      */
-    public FileWriteAheadLogManager(@NotNull final GridKernalContext ctx) {
+    public FileWriteAheadLogManager(final GridKernalContext ctx) {
         igCfg = ctx.config();
 
         DataStorageConfiguration dsCfg = igCfg.getDataStorageConfiguration();
@@ -467,8 +475,9 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
             checkOrPrepareFiles();
 
-            if (metrics != null)
+            if (metrics != null) {
                 metrics.setWalSizeProvider(new CO<Long>() {
+                    /** {@inheritDoc} */
                     @Override public Long apply() {
                         long size = 0;
 
@@ -481,6 +490,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                         return size;
                     }
                 });
+            }
 
             segmentAware = new SegmentAware(dsCfg.getWalSegments(), 
dsCfg.isWalCompactionEnabled());
 
@@ -520,8 +530,8 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                 
cctx.kernalContext().compress().checkPageCompressionSupported();
 
                 pageCompressionLevel = dsCfg.getWalPageCompressionLevel() != 
null ?
-                    
CompressionProcessor.checkCompressionLevelBounds(dsCfg.getWalPageCompressionLevel(),
 pageCompression) :
-                    
CompressionProcessor.getDefaultCompressionLevel(pageCompression);
+                    
checkCompressionLevelBounds(dsCfg.getWalPageCompressionLevel(), 
pageCompression) :
+                    getDefaultCompressionLevel(pageCompression);
             }
         }
     }
@@ -585,10 +595,10 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         List<File> res = new ArrayList<>();
 
         for (long i = low.index(); i < high.index(); i++) {
-            String segmentName = FileDescriptor.fileName(i);
+            String segmentName = fileName(i);
 
             File file = new File(walArchiveDir, segmentName);
-            File fileZip = new File(walArchiveDir, segmentName + 
FilePageStoreManager.ZIP_SUFFIX);
+            File fileZip = new File(walArchiveDir, segmentName + ZIP_SUFFIX);
 
             if (file.exists())
                 res.add(file);
@@ -640,7 +650,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
             fileHandleManager.onDeactivate();
         }
         catch (Exception e) {
-            U.error(log, "Failed to gracefully close WAL segment: " + 
this.currHnd, e);
+            U.error(log, "Failed to gracefully close WAL segment: " + currHnd, 
e);
         }
 
         segmentAware.interrupt();
@@ -691,13 +701,12 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
     /** {@inheritDoc} */
     @Override public void resumeLogging(WALPointer filePtr) throws 
IgniteCheckedException {
-        if (log.isDebugEnabled())
+        if (log.isDebugEnabled()) {
             log.debug("File write ahead log manager resuming logging [nodeId=" 
+ cctx.localNodeId() +
                 " topVer=" + cctx.discovery().topologyVersionEx() + " ]");
+        }
 
-        /*
-            walDisableContext is started after FileWriteAheadLogManager, so we 
obtain actual walDisableContext ref here.
-         */
+        // walDisableContext is started after FileWriteAheadLogManager, so we 
obtain actual walDisableContext ref here.
         synchronized (this) {
             walDisableContext = cctx.walState().walDisableContext();
         }
@@ -711,17 +720,18 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
         fileHandleManager.resumeLogging();
 
-        currHnd = restoreWriteHandle(filePtr);
+        updateCurrentHandle(restoreWriteHandle(filePtr), null);
 
         // For new handle write serializer version to it.
         if (filePtr == null)
             currHnd.writeHeader();
 
         if (currHnd.serializerVersion() != serializer.version()) {
-            if (log.isInfoEnabled())
+            if (log.isInfoEnabled()) {
                 log.info("Record serializer version change detected, will 
start logging with a new WAL record " +
                     "serializer to a new WAL segment [curFile=" + currHnd + ", 
newVer=" + serializer.version() +
                     ", oldVer=" + currHnd.serializerVersion() + ']');
+            }
 
             rollOver(currHnd, null);
         }
@@ -1010,9 +1020,9 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
      * @return {@code true} if has this index.
      */
     private boolean hasIndex(long absIdx) {
-        String segmentName = FileDescriptor.fileName(absIdx);
+        String segmentName = fileName(absIdx);
 
-        String zipSegmentName = FileDescriptor.fileName(absIdx) + 
FilePageStoreManager.ZIP_SUFFIX;
+        String zipSegmentName = segmentName + ZIP_SUFFIX;
 
         boolean inArchive = new File(walArchiveDir, segmentName).exists() ||
             new File(walArchiveDir, zipSegmentName).exists();
@@ -1053,12 +1063,16 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
             // We need to leave at least one archived segment to correctly 
determine the archive index.
             if (desc.idx < high.index() && desc.idx < lastArchived) {
-                if (!desc.file.delete())
+                if (!desc.file.delete()) {
                     U.warn(log, "Failed to remove obsolete WAL segment (make 
sure the process has enough rights): " +
                         desc.file.getAbsolutePath());
-                else
+                }
+                else {
                     deleted++;
 
+                    segmentSize.remove(desc.idx());
+                }
+
                 // Bump up the oldest archive segment index.
                 if (segmentAware.lastTruncatedArchiveIdx() < desc.idx)
                     segmentAware.lastTruncatedArchiveIdx(desc.idx);
@@ -1174,11 +1188,11 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
      * @param file File to read.
      * @param ioFactory IO factory.
      */
-    private FileDescriptor readFileDescriptor(File file, FileIOFactory 
ioFactory) {
+    @Nullable private FileDescriptor readFileDescriptor(File file, 
FileIOFactory ioFactory) {
         FileDescriptor ds = new FileDescriptor(file);
 
         try (SegmentIO fileIO = ds.toIO(ioFactory)) {
-            // File may be empty when LOG_ONLY mode is enabled and mmap is 
disabled
+            // File may be empty when LOG_ONLY mode is enabled and mmap is 
disabled.
             if (fileIO.size() == 0)
                 return null;
 
@@ -1283,9 +1297,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
             if (next.getSegmentId() - lashCheckpointFileIdx() >= 
maxSegCountWithoutCheckpoint)
                 cctx.database().forceCheckpoint("too big size of WAL without 
checkpoint");
 
-            boolean swapped = CURR_HND_UPD.compareAndSet(this, hnd, next);
-
-            assert swapped : "Concurrent updates on rollover are not allowed";
+            assert updateCurrentHandle(next, hnd) : "Concurrent updates on 
rollover are not allowed";
 
             if (walAutoArchiveAfterInactivity > 0)
                 lastRecordLoggedMs.set(0);
@@ -1313,14 +1325,14 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
      * @return Initialized file write handle.
      * @throws StorageException If failed to initialize WAL write handle.
      */
-    private FileWriteHandle restoreWriteHandle(WALPointer lastReadPtr) throws 
StorageException {
+    private FileWriteHandle restoreWriteHandle(@Nullable WALPointer 
lastReadPtr) throws StorageException {
         long absIdx = lastReadPtr == null ? 0 : lastReadPtr.index();
 
         @Nullable FileArchiver archiver0 = archiver;
 
         long segNo = archiver0 == null ? absIdx : absIdx % 
dsCfg.getWalSegments();
 
-        File curFile = new File(walWorkDir, FileDescriptor.fileName(segNo));
+        File curFile = new File(walWorkDir, fileName(segNo));
 
         int off = lastReadPtr == null ? 0 : lastReadPtr.fileOffset();
         int len = lastReadPtr == null ? 0 : lastReadPtr.length();
@@ -1348,9 +1360,10 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
                 RecordSerializer ser = new 
RecordSerializerFactoryImpl(cctx).createSerializer(serVer);
 
-                if (log.isInfoEnabled())
+                if (log.isInfoEnabled()) {
                     log.info("Resuming logging to WAL segment [file=" + 
curFile.getAbsolutePath() +
                         ", offset=" + off + ", ver=" + serVer + ']');
+                }
 
                 FileWriteHandle hnd = fileHandleManager.initHandle(fileIO, off 
+ len, ser);
 
@@ -1359,6 +1372,24 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                 else
                     segmentAware.setLastArchivedAbsoluteIndex(absIdx - 1);
 
+                // Getting segment sizes.
+                
F.asList(walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)).stream()
+                    .map(FileDescriptor::new)
+                    .forEach(fd -> {
+                        if (fd.isCompressed())
+                            segmentSize.put(fd.idx(), fd.file().length());
+                        else
+                            segmentSize.putIfAbsent(fd.idx(), 
fd.file().length());
+                    });
+
+                // If walArchiveDir != walWorkDir, then need to get size of 
all segments that were not in archive.
+                // For example, absIdx == 8, and there are 0-4 segments in 
archive, then we need to get sizes of 5-7 segments.
+                // Size of the 8th segment will be set in #resumeLogging.
+                if (archiver0 != null) {
+                    for (long i = absIdx - (absIdx % dsCfg.getWalSegments()); 
i < absIdx; i++)
+                        segmentSize.putIfAbsent(i, maxWalSegmentSize);
+                }
+
                 return hnd;
             }
             catch (IgniteCheckedException | IOException e) {
@@ -1467,25 +1498,24 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
             if (!F.isEmpty(tmpFiles)) {
                 for (File tmp : tmpFiles) {
-                    boolean deleted = tmp.delete();
-
-                    if (!deleted)
+                    if (!tmp.delete()) {
                         throw new StorageException("Failed to delete 
previously created temp file " +
                             "(make sure Ignite process has enough rights): " + 
tmp.getAbsolutePath());
+                    }
                 }
             }
         }
 
         File[] allFiles = walWorkDir.listFiles(WAL_SEGMENT_FILE_FILTER);
 
-        if (isArchiverEnabled())
-            if (allFiles.length != 0 && allFiles.length > 
dsCfg.getWalSegments())
-                throw new StorageException("Failed to initialize wal (work 
directory contains " +
-                    "incorrect number of segments) [cur=" + allFiles.length + 
", expected=" + dsCfg.getWalSegments() + ']');
+        if (isArchiverEnabled() && !F.isEmpty(allFiles) && allFiles.length > 
dsCfg.getWalSegments()) {
+            throw new StorageException("Failed to initialize wal (work 
directory contains incorrect " +
+                "number of segments) [cur=" + allFiles.length + ", expected=" 
+ dsCfg.getWalSegments() + ']');
+        }
 
         // Allocate the first segment synchronously. All other segments will 
be allocated by archiver in background.
-        if (allFiles.length == 0) {
-            File first = new File(walWorkDir, FileDescriptor.fileName(0));
+        if (F.isEmpty(allFiles)) {
+            File first = new File(walWorkDir, fileName(0));
 
             createFile(first);
         }
@@ -1575,7 +1605,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         if (archiver0 == null) {
             segmentAware.setLastArchivedAbsoluteIndex(curIdx);
 
-            return new File(walWorkDir, FileDescriptor.fileName(curIdx + 1));
+            return new File(walWorkDir, fileName(curIdx + 1));
         }
 
         long absNextIdxStartTime = System.nanoTime();
@@ -1598,7 +1628,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
         long segmentIdx = absNextIdx % dsCfg.getWalSegments();
 
-        return new File(walWorkDir, FileDescriptor.fileName(segmentIdx));
+        return new File(walWorkDir, fileName(segmentIdx));
     }
 
     /**
@@ -1638,7 +1668,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
     /**
      * @return Sorted WAL files descriptors.
      */
-    public static FileDescriptor[] scan(File[] allFiles) {
+    public static FileDescriptor[] scan(@Nullable File[] allFiles) {
         if (allFiles == null)
             return EMPTY_DESCRIPTORS;
 
@@ -1701,7 +1731,10 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         private int formatted;
 
         /**
+         * Constructor.
          *
+         * @param segmentAware Segment aware.
+         * @param log Logger.
          */
         private FileArchiver(SegmentAware segmentAware, IgniteLogger log) 
throws IgniteCheckedException {
             super(cctx.igniteInstanceName(), "wal-file-archiver%" + 
cctx.igniteInstanceName(), log,
@@ -1711,6 +1744,8 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         }
 
         /**
+         * Initialization.
+         *
          * @param segmentAware Segment aware.
          * @throws IgniteCheckedException If initialization failed.
          */
@@ -1737,13 +1772,13 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
             for (File file : 
walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)) {
                 try {
-                    long idx = Long.parseLong(file.getName().substring(0, 16));
+                    long idx = new FileDescriptor(file).idx();
 
                     FileDescriptor desc = readFileDescriptor(file, ioFactory);
 
                     if (desc != null) {
                         if (desc.idx() == idx)
-                            archiveIndices.put(desc.idx(), desc);
+                            archiveIndices.put(idx, desc);
                     }
                     else
                         log.warning("Skip file, failed read file header " + 
file);
@@ -1762,7 +1797,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
                 // Try to find min and max if we have skipped range semgnets 
in archive. Find firs gap.
                 for (Long idx : archiveIndices.descendingKeySet()) {
-                    if (!archiveIndices.keySet().contains(idx - 1))
+                    if (!archiveIndices.containsKey(idx - 1))
                         return F.t(idx, max);
                 }
 
@@ -1964,41 +1999,41 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         }
 
         /**
-         * Moves WAL segment from work folder to archive folder. Temp file is 
used to do movement
+         * Moves WAL segment from work folder to archive folder. Temp file is 
used to do movement.
          *
          * @param absIdx Absolute index to archive.
+         * @throws StorageException If failed.
          */
         public SegmentArchiveResult archiveSegment(long absIdx) throws 
StorageException {
             long segIdx = absIdx % dsCfg.getWalSegments();
 
-            File origFile = new File(walWorkDir, 
FileDescriptor.fileName(segIdx));
+            File origFile = new File(walWorkDir, fileName(segIdx));
 
-            String name = FileDescriptor.fileName(absIdx);
+            String name = fileName(absIdx);
 
             File dstTmpFile = new File(walArchiveDir, name + TMP_SUFFIX);
 
             File dstFile = new File(walArchiveDir, name);
 
-            if (log.isInfoEnabled())
+            if (log.isInfoEnabled()) {
                 log.info("Starting to copy WAL segment [absIdx=" + absIdx + ", 
segIdx=" + segIdx +
                     ", origFile=" + origFile.getAbsolutePath() + ", dstFile=" 
+ dstFile.getAbsolutePath() + ']');
+            }
 
             try {
                 Files.deleteIfExists(dstTmpFile.toPath());
 
                 boolean copied = false;
 
-                if (switchSegmentRecordOffset != null) {
-                    long offs = switchSegmentRecordOffset.get((int)segIdx);
+                long offs = switchSegmentRecordOffset.get((int)segIdx);
 
-                    if (offs > 0) {
-                        switchSegmentRecordOffset.set((int)segIdx, 0);
+                if (offs > 0) {
+                    switchSegmentRecordOffset.set((int)segIdx, 0);
 
-                        if (offs < origFile.length()) {
-                            GridFileUtils.copy(ioFactory, origFile, ioFactory, 
dstTmpFile, offs);
+                    if (offs < origFile.length()) {
+                        GridFileUtils.copy(ioFactory, origFile, ioFactory, 
dstTmpFile, offs);
 
-                            copied = true;
-                        }
+                        copied = true;
                     }
                 }
 
@@ -2012,6 +2047,8 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                         f0.force();
                     }
                 }
+
+                segmentSize.put(absIdx, dstFile.length());
             }
             catch (IOException e) {
                 throw new StorageException("Failed to archive WAL segment [" +
@@ -2019,9 +2056,10 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                     ", dstFile=" + dstTmpFile.getAbsolutePath() + ']', e);
             }
 
-            if (log.isInfoEnabled())
+            if (log.isInfoEnabled()) {
                 log.info("Copied file [src=" + origFile.getAbsolutePath() +
                     ", dst=" + dstFile.getAbsolutePath() + ']');
+            }
 
             return new SegmentArchiveResult(absIdx, origFile, dstFile);
         }
@@ -2078,7 +2116,11 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         /** Workers queue. */
         private final List<FileCompressorWorker> workers = new ArrayList<>();
 
-        /** */
+        /**
+         * Constructor.
+         *
+         * @param log Logger.
+         */
         FileCompressor(IgniteLogger log) {
             super(0, log);
 
@@ -2215,12 +2257,13 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
                     deleteObsoleteRawSegments();
 
-                    File tmpZip = new File(walArchiveDir, 
FileDescriptor.fileName(segIdx)
-                            + FilePageStoreManager.ZIP_SUFFIX + TMP_SUFFIX);
+                    String segmentFileName = fileName(segIdx);
 
-                    File zip = new File(walArchiveDir, 
FileDescriptor.fileName(segIdx) + FilePageStoreManager.ZIP_SUFFIX);
+                    File tmpZip = new File(walArchiveDir, segmentFileName + 
ZIP_SUFFIX + TMP_SUFFIX);
 
-                    File raw = new File(walArchiveDir, 
FileDescriptor.fileName(segIdx));
+                    File zip = new File(walArchiveDir, segmentFileName + 
ZIP_SUFFIX);
+
+                    File raw = new File(walArchiveDir, segmentFileName);
 
                     if (!Files.exists(raw.toPath()))
                         throw new IgniteCheckedException("WAL archive segment 
is missing: " + raw);
@@ -2235,13 +2278,8 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
                     segmentAware.onSegmentCompressed(segIdx);
 
-                    if (evt.isRecordable(EVT_WAL_SEGMENT_COMPACTED) && 
!cctx.kernalContext().recoveryMode()) {
-                        evt.record(new WalSegmentCompactedEvent(
-                                cctx.localNode(),
-                                segIdx,
-                                zip.getAbsoluteFile())
-                        );
-                    }
+                    if (evt.isRecordable(EVT_WAL_SEGMENT_COMPACTED) && 
!cctx.kernalContext().recoveryMode())
+                        evt.record(new 
WalSegmentCompactedEvent(cctx.localNode(), segIdx, zip.getAbsoluteFile()));
                 }
                 catch (IgniteInterruptedCheckedException ignore) {
                     Thread.currentThread().interrupt();
@@ -2250,7 +2288,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                     lastCompressionError = e;
 
                     U.error(log, "Compression of WAL segment [idx=" + segIdx +
-                            "] was skipped due to unexpected error", 
lastCompressionError);
+                        "] was skipped due to unexpected error", 
lastCompressionError);
 
                     segmentAware.onSegmentCompressed(segIdx);
                 }
@@ -2262,26 +2300,30 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         }
 
         /**
-         * @param nextSegment Next segment absolute idx.
-         * @param raw Raw file.
-         * @param zip Zip file.
+         * Segment compression.
+         *
+         * @param idx Segment absolute index.
+         * @param raw Raw segment file.
+         * @param zip Zip file to writing.
+         * @throws IOException If failed.
+         * @throws IgniteCheckedException If failed.
          */
-        private void compressSegmentToFile(long nextSegment, File raw, File 
zip)
-                throws IOException, IgniteCheckedException {
-            int segmentSerializerVer;
+        private void compressSegmentToFile(long idx, File raw, File zip) 
throws IOException, IgniteCheckedException {
+            int serializerVer;
 
             try (FileIO fileIO = ioFactory.create(raw)) {
-                segmentSerializerVer = readSegmentHeader(new 
SegmentIO(nextSegment, fileIO), segmentFileInputFactory).getSerializerVersion();
+                serializerVer = readSegmentHeader(new SegmentIO(idx, fileIO), 
segmentFileInputFactory)
+                    .getSerializerVersion();
             }
 
             try (ZipOutputStream zos = new ZipOutputStream(new 
BufferedOutputStream(new FileOutputStream(zip)))) {
                 zos.setLevel(dsCfg.getWalCompactionLevel());
-                zos.putNextEntry(new ZipEntry(nextSegment + ".wal"));
+                zos.putNextEntry(new ZipEntry(idx + ".wal"));
 
                 ByteBuffer buf = ByteBuffer.allocate(HEADER_RECORD_SIZE);
                 buf.order(ByteOrder.nativeOrder());
 
-                zos.write(prepareSerializerVersionBuffer(nextSegment, 
segmentSerializerVer, true, buf).array());
+                zos.write(prepareSerializerVersionBuffer(idx, serializerVer, 
true, buf).array());
 
                 final CIX1<WALRecord> appendToZipC = new CIX1<WALRecord>() {
                     @Override public void applyx(WALRecord record) throws 
IgniteCheckedException {
@@ -2297,32 +2339,36 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                 };
 
                 try (SingleSegmentLogicalRecordsIterator iter = new 
SingleSegmentLogicalRecordsIterator(
-                        log, cctx, ioFactory, BUF_SIZE, nextSegment, 
walArchiveDir, appendToZipC)) {
+                    log, cctx, ioFactory, BUF_SIZE, idx, walArchiveDir, 
appendToZipC)) {
 
                     while (iter.hasNextX())
                         iter.nextX();
                 }
 
-                RecordSerializer ser = new 
RecordSerializerFactoryImpl(cctx).createSerializer(segmentSerializerVer);
+                RecordSerializer ser = new 
RecordSerializerFactoryImpl(cctx).createSerializer(serializerVer);
 
-                ByteBuffer heapBuf = 
prepareSwitchSegmentRecordBuffer(nextSegment, ser);
+                ByteBuffer heapBuf = prepareSwitchSegmentRecordBuffer(idx, 
ser);
 
                 zos.write(heapBuf.array());
             }
+
+            segmentSize.put(idx, zip.length());
         }
 
         /**
-         * @param nextSegment Segment index.
+         * @param idx Segment index.
          * @param ser Record Serializer.
          */
-        @NotNull private ByteBuffer prepareSwitchSegmentRecordBuffer(long 
nextSegment, RecordSerializer ser)
-                throws IgniteCheckedException {
+        private ByteBuffer prepareSwitchSegmentRecordBuffer(
+            long idx,
+            RecordSerializer ser
+        ) throws IgniteCheckedException {
             SwitchSegmentRecord switchRecord = new SwitchSegmentRecord();
 
             int switchRecordSize = ser.size(switchRecord);
             switchRecord.size(switchRecordSize);
 
-            switchRecord.position(new WALPointer(nextSegment, 0, 
switchRecordSize));
+            switchRecord.position(new WALPointer(idx, 0, switchRecordSize));
 
             ByteBuffer heapBuf = ByteBuffer.allocate(switchRecordSize);
 
@@ -2353,9 +2399,11 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                     return;
 
                 if (desc.idx < segmentAware.keepUncompressedIdxFrom() && 
duplicateIndices.contains(desc.idx)) {
-                    if (desc.file.exists() && !desc.file.delete())
-                        U.warn(log, "Failed to remove obsolete WAL segment 
(make sure the process has enough rights): " +
-                                desc.file.getAbsolutePath() + ", exists: " + 
desc.file.exists());
+                    if (desc.file.exists() && !desc.file.delete()) {
+                        U.warn(log, "Failed to remove obsolete WAL segment " +
+                            "(make sure the process has enough rights): " + 
desc.file.getAbsolutePath() +
+                            ", exists: " + desc.file.exists());
+                    }
                 }
             }
         }
@@ -2403,11 +2451,11 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                         if (isCancelled())
                             break;
 
-                        File zip = new File(walArchiveDir, 
FileDescriptor.fileName(segmentToDecompress)
-                            + FilePageStoreManager.ZIP_SUFFIX);
-                        File unzipTmp = new File(walArchiveDir, 
FileDescriptor.fileName(segmentToDecompress)
-                            + TMP_SUFFIX);
-                        File unzip = new File(walArchiveDir, 
FileDescriptor.fileName(segmentToDecompress));
+                        String segmentFileName = fileName(segmentToDecompress);
+
+                        File zip = new File(walArchiveDir, segmentFileName + 
ZIP_SUFFIX);
+                        File unzipTmp = new File(walArchiveDir, 
segmentFileName + TMP_SUFFIX);
+                        File unzip = new File(walArchiveDir, segmentFileName);
 
                         try (ZipInputStream zis = new ZipInputStream(new 
BufferedInputStream(new FileInputStream(zip)));
                              FileIO io = ioFactory.create(unzipTmp)) {
@@ -2475,7 +2523,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
             if (decompressionFutures.containsKey(idx))
                 return decompressionFutures.get(idx);
 
-            File f = new File(walArchiveDir, FileDescriptor.fileName(idx));
+            File f = new File(walArchiveDir, fileName(idx));
 
             if (f.exists())
                 return new GridFinishedFuture<>();
@@ -2518,33 +2566,36 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
      * @param startWith Start with.
      * @param create Flag create file.
      * @param p Predicate Exit condition.
+     * @param completionCb Callback after verification segment.
      * @throws StorageException if validation or create file fail.
      */
     private void checkFiles(
         int startWith,
         boolean create,
         @Nullable IgnitePredicate<Integer> p,
-        @Nullable IgniteInClosure<Integer> completionCallback
+        @Nullable IgniteInClosure<Integer> completionCb
     ) throws StorageException {
         for (int i = startWith; i < dsCfg.getWalSegments() && (p == null || 
p.apply(i)); i++) {
-            File checkFile = new File(walWorkDir, FileDescriptor.fileName(i));
+            File checkFile = new File(walWorkDir, fileName(i));
 
             if (checkFile.exists()) {
-                if (checkFile.isDirectory())
+                if (checkFile.isDirectory()) {
                     throw new StorageException("Failed to initialize WAL log 
segment (a directory with " +
                         "the same name already exists): " + 
checkFile.getAbsolutePath());
-                else if (checkFile.length() != dsCfg.getWalSegmentSize() && 
mode == WALMode.FSYNC)
+                }
+                else if (checkFile.length() != dsCfg.getWalSegmentSize() && 
mode == WALMode.FSYNC) {
                     throw new StorageException("Failed to initialize WAL log 
segment " +
                         "(WAL segment size change is not supported in 
'DEFAULT' WAL mode) " +
                         "[filePath=" + checkFile.getAbsolutePath() +
                         ", fileSize=" + checkFile.length() +
                         ", configSize=" + dsCfg.getWalSegmentSize() + ']');
+                }
             }
             else if (create)
                 createFile(checkFile);
 
-            if (completionCallback != null)
-                completionCallback.apply(i);
+            if (completionCb != null)
+                completionCb.apply(i);
         }
     }
 
@@ -2555,7 +2606,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
      * @param ver Version.
      * @param compacted Compacted flag.
      */
-    @NotNull public static ByteBuffer prepareSerializerVersionBuffer(long idx, 
int ver, boolean compacted, ByteBuffer buf) {
+    public static ByteBuffer prepareSerializerVersionBuffer(long idx, int ver, 
boolean compacted, ByteBuffer buf) {
         // Write record type.
         buf.put((byte) (WALRecord.RecordType.HEADER_RECORD.ordinal() + 1));
 
@@ -2712,7 +2763,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
             @Nullable WALPointer start,
             @Nullable WALPointer end,
             DataStorageConfiguration dsCfg,
-            @NotNull RecordSerializerFactory serializerFactory,
+            RecordSerializerFactory serializerFactory,
             FileIOFactory ioFactory,
             @Nullable FileArchiver archiver,
             FileDecompressor decompressor,
@@ -2742,15 +2793,14 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
         /** {@inheritDoc} */
         @Override protected ReadFileHandle initReadHandle(
-            @NotNull AbstractFileDescriptor desc,
+            AbstractFileDescriptor desc,
             @Nullable WALPointer start
         ) throws IgniteCheckedException, FileNotFoundException {
             AbstractFileDescriptor currDesc = desc;
 
             if (!desc.file().exists()) {
                 FileDescriptor zipFile = new FileDescriptor(
-                    new File(walArchiveDir, FileDescriptor.fileName(desc.idx())
-                        + FilePageStoreManager.ZIP_SUFFIX));
+                    new File(walArchiveDir, fileName(desc.idx()) + 
ZIP_SUFFIX));
 
                 if (!zipFile.file.exists()) {
                     throw new FileNotFoundException("Both compressed and raw 
segment files are missing in archive " +
@@ -2902,10 +2952,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         }
 
         /** {@inheritDoc} */
-        @Override protected IgniteCheckedException handleRecordException(
-            @NotNull Exception e,
-            @Nullable WALPointer ptr) {
-
+        @Override protected IgniteCheckedException 
handleRecordException(Exception e, @Nullable WALPointer ptr) {
             if (e instanceof IgniteCheckedException)
                 if (X.hasCause(e, IgniteDataIntegrityViolationException.class))
                     // This means that there is no explicit last sengment, so 
we iterate unil the very end.
@@ -2971,12 +3018,10 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         private boolean canIgnoreCrcError(
             long workIdx,
             long walSegmentIdx,
-            @NotNull Exception e,
-            @Nullable WALPointer ptr) {
-            FileDescriptor fd = new FileDescriptor(
-                new File(walWorkDir, FileDescriptor.fileName(workIdx)),
-                walSegmentIdx
-            );
+            Exception e,
+            @Nullable WALPointer ptr
+        ) {
+            FileDescriptor fd = new FileDescriptor(new File(walWorkDir, 
fileName(workIdx)), walSegmentIdx);
 
             try {
                 if (!fd.file().exists())
@@ -3023,7 +3068,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
      * @param walFilesDir directory to scan
      * @return found WAL file descriptors
      */
-    public static FileDescriptor[] loadFileDescriptors(@NotNull final File 
walFilesDir) throws IgniteCheckedException {
+    public static FileDescriptor[] loadFileDescriptors(final File walFilesDir) 
throws IgniteCheckedException {
         final File[] files = 
walFilesDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER);
 
         if (files == null) {
@@ -3032,4 +3077,45 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         }
         return scan(files);
     }
+
+    /** {@inheritDoc} */
+    @Override public long segmentSize(long idx) {
+        return segmentSize.getOrDefault(idx, 0L);
+    }
+
+    /** {@inheritDoc} */
+    @Override public WALPointer lastWritePointer() {
+        return currHnd.position();
+    }
+
+    /**
+     * Concurrent {@link #currHnd} update.
+     *
+     * @param n New handle.
+     * @param c Current handle, if not {@code null} CAS will be used.
+     * @return {@code True} if updated.
+     */
+    private boolean updateCurrentHandle(FileWriteHandle n, @Nullable 
FileWriteHandle c) {
+        boolean res = true;
+
+        if (c == null)
+            currHnd = n;
+        else
+            res = CURR_HND_UPD.compareAndSet(this, c, n);
+
+        segmentSize.put(n.getSegmentId(), maxWalSegmentSize);
+
+        return res;
+    }
+
+    /**
+     * Check that file name matches segment name.
+     *
+     * @param name File name.
+     * @return {@code True} if file name matches segment name.
+     */
+    public static boolean isSegmentFileName(@Nullable String name) {
+        return name != null && (WAL_NAME_PATTERN.matcher(name).matches() ||
+            WAL_SEGMENT_FILE_COMPACTED_PATTERN.matcher(name).matches());
+    }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteLocalWalSizeTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteLocalWalSizeTest.java
new file mode 100644
index 0000000..2854a2a
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteLocalWalSizeTest.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db.wal;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.stream.IntStream;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.filehandle.FileWriteHandle;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.ZIP_SUFFIX;
+import static 
org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor.fileName;
+import static 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER;
+import static 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.isSegmentFileName;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+
+/**
+ * Class for testing local size of WAL.
+ */
+public class IgniteLocalWalSizeTest extends GridCommonAbstractTest {
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        stopAllGrids();
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        return super.getConfiguration(gridName)
+            .setCacheConfiguration(new 
CacheConfiguration<>(DEFAULT_CACHE_NAME))
+            .setDataStorageConfiguration(
+                new DataStorageConfiguration()
+                    .setWalSegments(5)
+                    .setWalSegmentSize((int)U.MB)
+                    .setDefaultDataRegionConfiguration(new 
DataRegionConfiguration().setPersistenceEnabled(true))
+            );
+    }
+
+    /**
+     * Checking correctness of working with local segment sizes for case: 
archiving only.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testLocalSegmentSizesArchiveOnly() throws Exception {
+        checkLocalSegmentSizesForOneNode(null);
+    }
+
+    /**
+     * Checking correctness of working with local segment sizes for case: 
archiving and compression.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testLocalSegmentSizesArchiveAndCompression() throws Exception {
+        checkLocalSegmentSizesForOneNode(cfg -> 
cfg.getDataStorageConfiguration().setWalCompactionEnabled(true));
+    }
+
+    /**
+     * Checking correctness of working with local segment sizes for case: 
without archiving.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testLocalSegmentSizesWithoutArchive() throws Exception {
+        checkLocalSegmentSizesForOneNode(cfg -> {
+            DataStorageConfiguration dsCfg = cfg.getDataStorageConfiguration();
+            dsCfg.setWalArchivePath(dsCfg.getWalPath());
+        });
+    }
+
+    /**
+     * Checking correctness of working with local segment sizes for case: 
without archiving and with compression.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testLocalSegmentSizesWithoutArchiveWithCompression() throws 
Exception {
+        checkLocalSegmentSizesForOneNode(cfg -> {
+            DataStorageConfiguration dsCfg = cfg.getDataStorageConfiguration();
+            
dsCfg.setWalArchivePath(dsCfg.getWalPath()).setWalCompactionEnabled(true);
+        });
+    }
+
+    /**
+     * Checking whether segment file name is checked correctly.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testSegmentFileName() throws Exception {
+        Arrays.asList(null, "", "1", "wal", fileName(0) + "1", 
fileName(1).replace(".wal", ".wa"))
+            .forEach(s -> assertFalse(s, isSegmentFileName(s)));
+
+        IntStream.range(0, 10)
+            .mapToObj(FileDescriptor::fileName)
+            .forEach(fn -> assertTrue(fn, isSegmentFileName(fn) && 
isSegmentFileName(fn + ZIP_SUFFIX)));
+    }
+
+    /**
+     * Checks whether local segment sizes are working correctly for a single 
node after loading and restarting.
+     *
+     * @param cfgUpdater Configuration updater.
+     * @throws Exception If failed.
+     */
+    private void checkLocalSegmentSizesForOneNode(
+        @Nullable Consumer<IgniteConfiguration> cfgUpdater
+    ) throws Exception {
+        IgniteConfiguration cfg = 
getConfiguration(getTestIgniteInstanceName(0));
+
+        if (cfgUpdater != null)
+            cfgUpdater.accept(cfg);
+
+        IgniteEx n = startGrid(cfg);
+        n.cluster().state(ClusterState.ACTIVE);
+
+        awaitPartitionMapExchange();
+
+        IgniteCache<Object, Object> c = n.getOrCreateCache(DEFAULT_CACHE_NAME);
+        IntStream.range(0, 10_000).forEach(i -> c.put(i, i));
+
+        forceCheckpoint();
+        checkLocalSegmentSizes(n);
+
+        stopGrid(cfg.getIgniteInstanceName());
+        awaitPartitionMapExchange();
+
+        cfg = getConfiguration(cfg.getIgniteInstanceName());
+
+        if (cfgUpdater != null)
+            cfgUpdater.accept(cfg);
+
+        // To avoid a race between compressor and getting the segment sizes.
+        if (cfg.getDataStorageConfiguration().isWalCompactionEnabled())
+            cfg.getDataStorageConfiguration().setWalCompactionEnabled(false);
+
+        n = startGrid(cfg);
+        awaitPartitionMapExchange();
+
+        checkLocalSegmentSizes(n);
+    }
+
+    /**
+     * Check that local segment sizes in the memory and actual match.
+     *
+     * @param n Node.
+     */
+    private void checkLocalSegmentSizes(IgniteEx n) {
+        FileWriteAheadLogManager wal = 
(FileWriteAheadLogManager)n.context().cache().context().wal();
+
+        File walWorkDir = getFieldValue(wal, "walWorkDir");
+        File walArchiveDir = getFieldValue(wal, "walArchiveDir");
+
+        Map<Long, Long> expSegmentSize = new HashMap<>();
+
+        
F.asList(walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER))
+            .stream()
+            .map(FileDescriptor::new)
+            .forEach(fd -> {
+                if (fd.isCompressed())
+                    expSegmentSize.put(fd.idx(), fd.file().length());
+                else
+                    expSegmentSize.putIfAbsent(fd.idx(), fd.file().length());
+            });
+
+        FileWriteHandle currHnd = getFieldValue(wal, "currHnd");
+
+        if (!walArchiveDir.equals(walWorkDir)) {
+            long absIdx = currHnd.getSegmentId();
+            int segments = 
n.configuration().getDataStorageConfiguration().getWalSegments();
+
+            for (long i = absIdx - (absIdx % segments); i <= absIdx; i++)
+                expSegmentSize.putIfAbsent(i, new File(walWorkDir, fileName(i 
% segments)).length());
+        }
+
+        assertEquals(currHnd.getSegmentId() + 1, expSegmentSize.size());
+
+        Map<Long, Long> segmentSize = getFieldValue(wal, "segmentSize");
+        assertEquals(expSegmentSize.size(), segmentSize.size());
+
+        expSegmentSize.forEach((idx, size) -> {
+            assertEquals(idx.toString(), size, segmentSize.get(idx));
+            assertEquals(idx.toString(), size.longValue(), 
wal.segmentSize(idx));
+        });
+
+        assertEquals(0, wal.segmentSize(currHnd.getSegmentId() + 1));
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
index 3dedd8f..2e78ad0 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
@@ -188,4 +188,14 @@ public class NoOpWALManager implements 
IgniteWriteAheadLogManager {
     @Override public long maxArchivedSegmentToDelete() {
         return -1;
     }
+
+    /** {@inheritDoc} */
+    @Override public long segmentSize(long idx) {
+        return -1;
+    }
+
+    /** {@inheritDoc} */
+    @Override public WALPointer lastWritePointer() {
+        return null;
+    }
 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index 8ff45e4..11b9ba5 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -56,6 +56,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.db.checkpoint.Che
 import 
org.apache.ignite.internal.processors.cache.persistence.db.checkpoint.IgniteCheckpointDirtyPagesForLowLoadTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.filename.IgniteUidAsConsistentIdMigrationTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.FsyncWalRolloverDoesNotBlockTest;
+import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteLocalWalSizeTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteNodeStoppedDuringDisableWALTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWALTailIsReachedDuringIterationOverArchiveTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalFlushBackgroundSelfTest;
@@ -231,5 +232,7 @@ public class IgnitePdsTestSuite2 {
         GridTestUtils.addTestIfNeeded(suite, 
IgniteWalRebalanceLoggingTest.class, ignoredTests);
 
         GridTestUtils.addTestIfNeeded(suite, 
HistoricalRebalanceHeuristicsTest.class, ignoredTests);
+
+        GridTestUtils.addTestIfNeeded(suite, IgniteLocalWalSizeTest.class, 
ignoredTests);
     }
 }

Reply via email to