IGNITE-9693 Scale up wal compression workers to increase performance - Fixes 
#4831.

Signed-off-by: Ivan Rakov <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/036bd074
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/036bd074
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/036bd074

Branch: refs/heads/ignite-5797
Commit: 036bd074d8bfd25a2c4c463a60dde00604d11b9d
Parents: a748090
Author: Ivan Daschinskiy <[email protected]>
Authored: Fri Sep 28 18:03:45 2018 +0300
Committer: Ivan Rakov <[email protected]>
Committed: Fri Sep 28 18:10:01 2018 +0300

----------------------------------------------------------------------
 .../apache/ignite/IgniteSystemProperties.java   |   5 +
 .../pagemem/wal/IgniteWriteAheadLogManager.java |   3 +-
 .../GridCacheDatabaseSharedManager.java         |  11 +-
 .../wal/FileWriteAheadLogManager.java           | 208 ++++++++++++-------
 .../wal/FsyncModeFileWriteAheadLogManager.java  |   7 +-
 .../persistence/wal/aware/SegmentAware.java     |  28 ++-
 .../wal/aware/SegmentCompressStorage.java       |  80 +++++--
 ...PdsReserveWalSegmentsWithCompactionTest.java |  34 +++
 .../persistence/pagemem/NoOpWALManager.java     |   2 +-
 .../persistence/wal/aware/SegmentAwareTest.java |  90 +++++---
 .../ignite/testsuites/IgnitePdsTestSuite2.java  |   2 +
 11 files changed, 324 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/036bd074/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java 
b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index 5932de0..01fb02a 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -901,6 +901,11 @@ public final class IgniteSystemProperties {
     public static final String IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE = 
"IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE";
 
     /**
+     * Count of WAL compressor worker threads. Default value is 4.
+     */
+    public static final String IGNITE_WAL_COMPRESSOR_WORKER_THREAD_CNT = 
"IGNITE_WAL_COMPRESSOR_WORKER_THREAD_CNT";
+
+    /**
      * Whenever read load balancing is enabled, that means 'get' requests will 
be distributed between primary and backup
      * nodes if it is possible and {@link CacheConfiguration#readFromBackup} 
is {@code true}.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/036bd074/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
index 12fd3e9..4ffa347 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/IgniteWriteAheadLogManager.java
@@ -86,9 +86,8 @@ public interface IgniteWriteAheadLogManager extends 
GridCacheSharedManager, Igni
      * Invoke this method to reserve WAL history since provided pointer and 
prevent it's deletion.
      *
      * @param start WAL pointer.
-     * @throws IgniteException If failed to reserve.
      */
-    public boolean reserve(WALPointer start) throws IgniteCheckedException;
+    public boolean reserve(WALPointer start);
 
     /**
      * Invoke this method to release WAL history since provided pointer that 
was previously reserved.

http://git-wip-us.apache.org/repos/asf/ignite/blob/036bd074/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 158c3b1..5e0b7cb 100755
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -1758,16 +1758,7 @@ public class GridCacheDatabaseSharedManager extends 
IgniteCacheDatabaseSharedMan
         if (ptr == null)
             return false;
 
-        boolean reserved;
-
-        try {
-            reserved = cctx.wal().reserve(ptr);
-        }
-        catch (IgniteCheckedException e) {
-            U.error(log, "Error while trying to reserve history", e);
-
-            reserved = false;
-        }
+        boolean reserved = cctx.wal().reserve(ptr);
 
         if (reserved)
             reservedForPreloading.put(new T2<>(grpId, partId), new T2<>(cntr, 
ptr));

http://git-wip-us.apache.org/repos/asf/ignite/blob/036bd074/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 5d165fd..43dfb8f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -134,6 +134,7 @@ import static java.nio.file.StandardOpenOption.READ;
 import static java.nio.file.StandardOpenOption.WRITE;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_CHECKPOINT_TRIGGER_ARCHIVE_SIZE_PERCENTAGE;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE;
+import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_COMPRESSOR_WORKER_THREAD_CNT;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_MMAP;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_SEGMENT_SYNC_TIMEOUT;
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_SERIALIZER_VERSION;
@@ -257,6 +258,12 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
     private static final double THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE =
         
IgniteSystemProperties.getDouble(IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE, 
0.5);
 
+    /**
+     * Number of WAL compressor worker threads.
+     */
+    private final int WAL_COMPRESSOR_WORKER_THREAD_CNT =
+            
IgniteSystemProperties.getInteger(IGNITE_WAL_COMPRESSOR_WORKER_THREAD_CNT, 4);
+
     /** */
     private final boolean alwaysWriteFullPages;
 
@@ -415,7 +422,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
         evt = ctx.event();
         failureProcessor = ctx.failure();
-        segmentAware = new SegmentAware(dsCfg.getWalSegments());
+        segmentAware = new SegmentAware(dsCfg.getWalSegments(), 
dsCfg.isWalCompactionEnabled());
     }
 
     /**
@@ -486,7 +493,8 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                 segmentAware.setLastArchivedAbsoluteIndex(lastAbsArchivedIdx);
 
             if (dsCfg.isWalCompactionEnabled()) {
-                compressor = new FileCompressor();
+                if (compressor == null)
+                    compressor = new FileCompressor(log);
 
                 if (decompressor == null) {  // Preventing of two 
file-decompressor thread instantiations.
                     decompressor = new FileDecompressor(log);
@@ -895,7 +903,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
     }
 
     /** {@inheritDoc} */
-    @Override public boolean reserve(WALPointer start) throws 
IgniteCheckedException {
+    @Override public boolean reserve(WALPointer start) {
         assert start != null && start instanceof FileWALPointer : "Invalid 
start pointer: " + start;
 
         if (mode == WALMode.NONE)
@@ -1005,7 +1013,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
     /** {@inheritDoc} */
     @Override public void notchLastCheckpointPtr(WALPointer ptr) {
         if (compressor != null)
-            compressor.keepUncompressedIdxFrom(((FileWALPointer)ptr).index());
+            
segmentAware.keepUncompressedIdxFrom(((FileWALPointer)ptr).index());
     }
 
     /** {@inheritDoc} */
@@ -1910,16 +1918,13 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
      * Responsible for compressing WAL archive segments.
      * Also responsible for deleting raw copies of already compressed WAL 
archive segments if they are not reserved.
      */
-    private class FileCompressor extends Thread {
-        /** Current thread stopping advice. */
-        private volatile boolean stopped;
-
-        /** All segments prior to this (inclusive) can be compressed. */
-        private volatile long minUncompressedIdxToKeep = -1L;
+    private class FileCompressor extends FileCompressorWorker {
+        /** Workers queue. */
+        List<FileCompressorWorker> workers = new ArrayList<>();
 
         /** */
-        FileCompressor() {
-            super("wal-file-compressor%" + cctx.igniteInstanceName());
+        FileCompressor(IgniteLogger log) {
+            super(0, log);
         }
 
         /** */
@@ -1927,7 +1932,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
             File[] toDel = 
walArchiveDir.listFiles(WAL_SEGMENT_TEMP_FILE_COMPACTED_FILTER);
 
             for (File f : toDel) {
-                if (stopped)
+                if (isCancelled())
                     return;
 
                 f.delete();
@@ -1936,82 +1941,118 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
             FileDescriptor[] alreadyCompressed = 
scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_COMPACTED_FILTER));
 
             if (alreadyCompressed.length > 0)
-                
segmentAware.lastCompressedIdx(alreadyCompressed[alreadyCompressed.length - 
1].idx());
+                
segmentAware.onSegmentCompressed(alreadyCompressed[alreadyCompressed.length - 
1].idx());
+
+            for (int i = 1; i < calculateThreadCount(); i++) {
+                FileCompressorWorker worker = new FileCompressorWorker(i, log);
+
+                worker.start();
+
+                workers.add(worker);
+            }
         }
 
         /**
-         * @param idx Minimum raw segment index that should be preserved from 
deletion.
+         * Calculate optimal additional compressor worker threads count. If 
quarter of proc threads greater
+         * than WAL_COMPRESSOR_WORKER_THREAD_CNT, use this value. Otherwise, 
reduce number of threads.
+         *
+         * @return Optimal number of compressor threads.
          */
-        void keepUncompressedIdxFrom(long idx) {
-            minUncompressedIdxToKeep = idx;
+        private int calculateThreadCount() {
+            int procNum = Runtime.getRuntime().availableProcessors();
+
+            // If quarter of proc threads greater than 
WAL_COMPRESSOR_WORKER_THREAD_CNT,
+            // use this value. Otherwise, reduce number of threads.
+            if (procNum >> 2 >= WAL_COMPRESSOR_WORKER_THREAD_CNT)
+                return WAL_COMPRESSOR_WORKER_THREAD_CNT;
+            else
+                return procNum >> 2;
         }
 
-        /**
-         * Pessimistically tries to reserve segment for compression in order 
to avoid concurrent truncation.
-         * Waits if there's no segment to archive right now.
-         */
-        private long tryReserveNextSegmentOrWait() throws 
IgniteCheckedException {
-            long segmentToCompress = segmentAware.waitNextSegmentToCompress();
 
-            boolean reserved = reserve(new FileWALPointer(segmentToCompress, 
0, 0));
+        /** {@inheritDoc} */
+        @Override public void body() throws InterruptedException, 
IgniteInterruptedCheckedException{
+            init();
 
-            return reserved ? segmentToCompress : -1;
+            super.body0();
         }
 
         /**
-         * Deletes raw WAL segments if they aren't locked and already have 
compressed copies of themselves.
+         * @throws IgniteInterruptedCheckedException If failed to wait for 
thread shutdown.
          */
-        private void deleteObsoleteRawSegments() {
-            FileDescriptor[] descs = 
scan(walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER));
+        private void shutdown() throws IgniteInterruptedCheckedException {
+            synchronized (this) {
+                for (FileCompressorWorker worker: workers)
+                    U.cancel(worker);
 
-            Set<Long> indices = new HashSet<>();
-            Set<Long> duplicateIndices = new HashSet<>();
+                for (FileCompressorWorker worker: workers)
+                    U.join(worker);
 
-            for (FileDescriptor desc : descs) {
-                if (!indices.add(desc.idx))
-                    duplicateIndices.add(desc.idx);
+                U.cancel(this);
             }
 
-            for (FileDescriptor desc : descs) {
-                if (desc.isCompressed())
-                    continue;
+            U.join(this);
+        }
+    }
 
-                // Do not delete reserved or locked segment and any segment 
after it.
-                if (segmentReservedOrLocked(desc.idx))
-                    return;
+    /** */
+    private class FileCompressorWorker extends GridWorker {
+        /** */
+        private Thread thread;
 
-                if (desc.idx < minUncompressedIdxToKeep && 
duplicateIndices.contains(desc.idx)) {
-                    if (!desc.file.delete())
-                        U.warn(log, "Failed to remove obsolete WAL segment 
(make sure the process has enough rights): " +
-                            desc.file.getAbsolutePath() + ", exists: " + 
desc.file.exists());
-                }
-            }
+        /** */
+        FileCompressorWorker(int idx,  IgniteLogger log) {
+            super(cctx.igniteInstanceName(), "wal-file-compressor-%" + 
cctx.igniteInstanceName() + "%-" + idx, log);
+        }
+
+        /** */
+        void start() {
+            thread = new IgniteThread(this);
+
+            thread.start();
+        }
+
+        /**
+         * Pessimistically tries to reserve segment for compression in order 
to avoid concurrent truncation.
+         * Waits if there's no segment to archive right now.
+         */
+        private long tryReserveNextSegmentOrWait() throws 
IgniteInterruptedCheckedException{
+            long segmentToCompress = segmentAware.waitNextSegmentToCompress();
+
+            boolean reserved = reserve(new FileWALPointer(segmentToCompress, 
0, 0));
+
+            return reserved ? segmentToCompress : -1;
         }
 
         /** {@inheritDoc} */
-        @Override public void run() {
-            init();
+        @Override protected void body() throws InterruptedException, 
IgniteInterruptedCheckedException {
+            body0();
+        }
 
-            while (!Thread.currentThread().isInterrupted() && !stopped) {
-                long currReservedSegment = -1;
+        /** */
+        private void body0() {
+            while (!isCancelled()) {
+                long segIdx = -1L;
 
                 try {
-                    deleteObsoleteRawSegments();
+                    segIdx = tryReserveNextSegmentOrWait();
 
-                    currReservedSegment = tryReserveNextSegmentOrWait();
-                    if (currReservedSegment == -1)
+                    if (segIdx <= segmentAware.lastCompressedIdx())
                         continue;
 
-                    File tmpZip = new File(walArchiveDir, 
FileDescriptor.fileName(currReservedSegment)
-                        + FilePageStoreManager.ZIP_SUFFIX + 
FilePageStoreManager.TMP_SUFFIX);
+                    deleteObsoleteRawSegments();
+
+                    File tmpZip = new File(walArchiveDir, 
FileDescriptor.fileName(segIdx)
+                            + FilePageStoreManager.ZIP_SUFFIX + 
FilePageStoreManager.TMP_SUFFIX);
 
-                    File zip = new File(walArchiveDir, 
FileDescriptor.fileName(currReservedSegment) + FilePageStoreManager.ZIP_SUFFIX);
+                    File zip = new File(walArchiveDir, 
FileDescriptor.fileName(segIdx) + FilePageStoreManager.ZIP_SUFFIX);
+
+                    File raw = new File(walArchiveDir, 
FileDescriptor.fileName(segIdx));
 
-                    File raw = new File(walArchiveDir, 
FileDescriptor.fileName(currReservedSegment));
                     if (!Files.exists(raw.toPath()))
                         throw new IgniteCheckedException("WAL archive segment 
is missing: " + raw);
 
-                    compressSegmentToFile(currReservedSegment, raw, tmpZip);
+                    compressSegmentToFile(segIdx, raw, tmpZip);
 
                     Files.move(tmpZip.toPath(), zip.toPath());
 
@@ -2022,27 +2063,27 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
                         if (evt.isRecordable(EVT_WAL_SEGMENT_COMPACTED)) {
                             evt.record(new WalSegmentCompactedEvent(
-                                cctx.discovery().localNode(),
-                                currReservedSegment,
-                                zip.getAbsoluteFile())
+                                    cctx.localNode(),
+                                    segIdx,
+                                    zip.getAbsoluteFile())
                             );
                         }
                     }
 
-                    segmentAware.lastCompressedIdx(currReservedSegment);
+                    segmentAware.onSegmentCompressed(segIdx);
                 }
                 catch (IgniteInterruptedCheckedException ignore) {
                     Thread.currentThread().interrupt();
                 }
                 catch (IgniteCheckedException | IOException e) {
-                    U.error(log, "Compression of WAL segment [idx=" + 
currReservedSegment +
-                        "] was skipped due to unexpected error", e);
+                    U.error(log, "Compression of WAL segment [idx=" + segIdx +
+                            "] was skipped due to unexpected error", e);
 
-                    segmentAware.lastCompressedIdx(currReservedSegment);
+                    segmentAware.onSegmentCompressed(segIdx);
                 }
                 finally {
-                    if (currReservedSegment != -1)
-                        release(new FileWALPointer(currReservedSegment, 0, 0));
+                    if (segIdx != -1L)
+                        release(new FileWALPointer(segIdx, 0, 0));
                 }
             }
         }
@@ -2053,7 +2094,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
          * @param zip Zip file.
          */
         private void compressSegmentToFile(long nextSegment, File raw, File 
zip)
-            throws IOException, IgniteCheckedException {
+                throws IOException, IgniteCheckedException {
             int segmentSerializerVer;
 
             try (FileIO fileIO = ioFactory.create(raw)) {
@@ -2083,7 +2124,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                 };
 
                 try (SingleSegmentLogicalRecordsIterator iter = new 
SingleSegmentLogicalRecordsIterator(
-                    log, cctx, ioFactory, BUF_SIZE, nextSegment, 
walArchiveDir, appendToZipC)) {
+                        log, cctx, ioFactory, BUF_SIZE, nextSegment, 
walArchiveDir, appendToZipC)) {
 
                     while (iter.hasNextX())
                         iter.nextX();
@@ -2102,7 +2143,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
          * @param ser Record Serializer.
          */
         @NotNull private ByteBuffer prepareSwitchSegmentRecordBuffer(long 
nextSegment, RecordSerializer ser)
-            throws IgniteCheckedException {
+                throws IgniteCheckedException {
             SwitchSegmentRecord switchRecord = new SwitchSegmentRecord();
 
             int switchRecordSize = ser.size(switchRecord);
@@ -2117,16 +2158,33 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         }
 
         /**
-         * @throws IgniteInterruptedCheckedException If failed to wait for 
thread shutdown.
+         * Deletes raw WAL segments if they aren't locked and already have 
compressed copies of themselves.
          */
-        private void shutdown() throws IgniteInterruptedCheckedException {
-            synchronized (this) {
-                stopped = true;
+        private void deleteObsoleteRawSegments() {
+            FileDescriptor[] descs = 
scan(walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER));
 
-                notifyAll();
+            Set<Long> indices = new HashSet<>();
+            Set<Long> duplicateIndices = new HashSet<>();
+
+            for (FileDescriptor desc : descs) {
+                if (!indices.add(desc.idx))
+                    duplicateIndices.add(desc.idx);
             }
 
-            U.join(this);
+            for (FileDescriptor desc : descs) {
+                if (desc.isCompressed())
+                    continue;
+
+                // Do not delete reserved or locked segment and any segment 
after it.
+                if (segmentReservedOrLocked(desc.idx))
+                    return;
+
+                if (desc.idx < segmentAware.keepUncompressedIdxFrom() && 
duplicateIndices.contains(desc.idx)) {
+                    if (desc.file.exists() && !desc.file.delete())
+                        U.warn(log, "Failed to remove obsolete WAL segment 
(make sure the process has enough rights): " +
+                                desc.file.getAbsolutePath() + ", exists: " + 
desc.file.exists());
+                }
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/036bd074/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
index df8f4de..1c0325e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
@@ -783,7 +783,7 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
     }
 
     /** {@inheritDoc} */
-    @Override public boolean reserve(WALPointer start) throws 
IgniteCheckedException {
+    @Override public boolean reserve(WALPointer start) {
         assert start != null && start instanceof FileWALPointer : "Invalid 
start pointer: " + start;
 
         if (mode == WALMode.NONE)
@@ -791,8 +791,7 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
 
         FileArchiver archiver0 = archiver;
 
-        if (archiver0 == null)
-            throw new IgniteCheckedException("Could not reserve WAL segment: 
archiver == null");
+        assert archiver0 != null : "Could not reserve WAL segment: archiver == 
null";
 
         archiver0.reserve(((FileWALPointer)start).index());
 
@@ -1912,7 +1911,7 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
          * Pessimistically tries to reserve segment for compression in order 
to avoid concurrent truncation.
          * Waits if there's no segment to archive right now.
          */
-        private long tryReserveNextSegmentOrWait() throws 
InterruptedException, IgniteCheckedException {
+        private long tryReserveNextSegmentOrWait() throws InterruptedException 
{
             long segmentToCompress = lastCompressedIdx + 1;
 
             synchronized (this) {

http://git-wip-us.apache.org/repos/asf/ignite/blob/036bd074/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
index 3379b74..6ba0399 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
@@ -36,15 +36,17 @@ public class SegmentAware {
     /** Manages last archived index, emulates archivation in no-archiver mode. 
*/
     private final SegmentArchivedStorage segmentArchivedStorage = 
buildArchivedStorage(segmentLockStorage);
     /** Storage of actual information about current index of compressed 
segments. */
-    private final SegmentCompressStorage segmentCompressStorage = 
buildCompressStorage(segmentArchivedStorage);
+    private final SegmentCompressStorage segmentCompressStorage;
     /** Storage of absolute current segment index. */
     private final SegmentCurrentStateStorage segmentCurrStateStorage;
 
     /**
      * @param walSegmentsCnt Total WAL segments count.
+     * @param compactionEnabled Is wal compaction enabled.
      */
-    public SegmentAware(int walSegmentsCnt) {
+    public SegmentAware(int walSegmentsCnt, boolean compactionEnabled) {
         segmentCurrStateStorage = buildCurrentStateStorage(walSegmentsCnt, 
segmentArchivedStorage);
+        segmentCompressStorage = buildCompressStorage(segmentArchivedStorage, 
compactionEnabled);
     }
 
     /**
@@ -108,12 +110,12 @@ public class SegmentAware {
     }
 
     /**
-     * Force set last compressed segment.
+     * Callback after segment compression finish.
      *
-     * @param lastCompressedIdx Segment which was last compressed.
+     * @param compressedIdx Index of compressed segment.
      */
-    public void lastCompressedIdx(long lastCompressedIdx) {
-        segmentCompressStorage.lastCompressedIdx(lastCompressedIdx);
+    public void onSegmentCompressed(long compressedIdx) {
+        segmentCompressStorage.onSegmentCompressed(compressedIdx);
     }
 
     /**
@@ -124,6 +126,20 @@ public class SegmentAware {
     }
 
     /**
+     * @param idx Minimum raw segment index that should be preserved from 
deletion.
+     */
+    public void keepUncompressedIdxFrom(long idx) {
+        segmentCompressStorage.keepUncompressedIdxFrom(idx);
+    }
+
+    /**
+     * @return  Minimum raw segment index that should be preserved from 
deletion.
+     */
+    public long keepUncompressedIdxFrom() {
+        return segmentCompressStorage.keepUncompressedIdxFrom();
+    }
+
+    /**
      * Update current WAL index.
      *
      * @param curAbsWalIdx New current WAL index.

http://git-wip-us.apache.org/repos/asf/ignite/blob/036bd074/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
index 30c9a2d..174fb46 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
@@ -18,6 +18,10 @@
 package org.apache.ignite.internal.processors.cache.persistence.wal.aware;
 
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
 
 /**
  * Storage of actual information about current index of compressed segments.
@@ -25,25 +29,50 @@ import 
org.apache.ignite.internal.IgniteInterruptedCheckedException;
 public class SegmentCompressStorage {
     /** Flag of interrupt waiting on this object. */
     private volatile boolean interrupted;
+
     /** Manages last archived index, emulates archivation in no-archiver mode. 
*/
     private final SegmentArchivedStorage segmentArchivedStorage;
+
+    /** If WAL compaction enabled. */
+    private final boolean compactionEnabled;
+
     /** Last successfully compressed segment. */
     private volatile long lastCompressedIdx = -1L;
 
+    /** Last enqueued to compress segment. */
+    private long lastEnqueuedToCompressIdx = -1L;
+
+    /** Segments to compress queue. */
+    private final Queue<Long> segmentsToCompress = new ArrayDeque<>();
+
+    /** List of currently compressing segments. */
+    private final List<Long> compressingSegments = new ArrayList<>();
+
+    /** Compressed segment with maximal index. */
+    private long lastMaxCompressedIdx = -1L;
+
+    /** Min uncompressed index to keep. */
+    private volatile long minUncompressedIdxToKeep = -1L;
+
     /**
      * @param segmentArchivedStorage Storage of last archived segment.
+     * @param compactionEnabled If WAL compaction enabled.
      */
-    private SegmentCompressStorage(SegmentArchivedStorage 
segmentArchivedStorage) {
+    private SegmentCompressStorage(SegmentArchivedStorage 
segmentArchivedStorage, boolean compactionEnabled) {
         this.segmentArchivedStorage = segmentArchivedStorage;
 
+        this.compactionEnabled = compactionEnabled;
+
         this.segmentArchivedStorage.addObserver(this::onSegmentArchived);
     }
 
     /**
      * @param segmentArchivedStorage Storage of last archived segment.
+     * @param compactionEnabled If WAL compaction enabled.
      */
-    static SegmentCompressStorage buildCompressStorage(SegmentArchivedStorage 
segmentArchivedStorage) {
-        SegmentCompressStorage storage = new 
SegmentCompressStorage(segmentArchivedStorage);
+    static SegmentCompressStorage buildCompressStorage(SegmentArchivedStorage 
segmentArchivedStorage,
+                                                       boolean 
compactionEnabled) {
+        SegmentCompressStorage storage = new 
SegmentCompressStorage(segmentArchivedStorage, compactionEnabled);
 
         segmentArchivedStorage.addObserver(storage::onSegmentArchived);
 
@@ -51,12 +80,20 @@ public class SegmentCompressStorage {
     }
 
     /**
-     * Force set last compressed segment.
+     * Callback after segment compression finish.
      *
-     * @param lastCompressedIdx Segment which was last compressed.
+     * @param compressedIdx Index of compressed segment.
      */
-    void lastCompressedIdx(long lastCompressedIdx) {
-        this.lastCompressedIdx = lastCompressedIdx;
+    synchronized void onSegmentCompressed(long compressedIdx) {
+        if (compressedIdx > lastMaxCompressedIdx)
+            lastMaxCompressedIdx = compressedIdx;
+
+        compressingSegments.remove(compressedIdx);
+
+        if (!compressingSegments.isEmpty())
+            this.lastCompressedIdx = Math.min(lastMaxCompressedIdx, 
compressingSegments.get(0) - 1);
+        else
+            this.lastCompressedIdx = lastMaxCompressedIdx;
     }
 
     /**
@@ -71,13 +108,8 @@ public class SegmentCompressStorage {
      * there's no segment to archive right now.
      */
     synchronized long nextSegmentToCompressOrWait() throws 
IgniteInterruptedCheckedException {
-        long segmentToCompress = lastCompressedIdx + 1;
-
         try {
-            while (
-                segmentToCompress > 
segmentArchivedStorage.lastArchivedAbsoluteIndex()
-                    && !interrupted
-                )
+            while (segmentsToCompress.peek() == null && !interrupted)
                 wait();
         }
         catch (InterruptedException e) {
@@ -86,7 +118,11 @@ public class SegmentCompressStorage {
 
         checkInterrupted();
 
-        return segmentToCompress;
+        Long idx = segmentsToCompress.poll();
+
+        compressingSegments.add(idx);
+
+        return idx == null ? -1L : idx;
     }
 
     /**
@@ -110,7 +146,23 @@ public class SegmentCompressStorage {
      * Callback for waking up compressor when new segment is archived.
      */
     private synchronized void onSegmentArchived(long lastAbsArchivedIdx) {
+        while (lastEnqueuedToCompressIdx < lastAbsArchivedIdx && 
compactionEnabled)
+            segmentsToCompress.add(++lastEnqueuedToCompressIdx);
+
         notifyAll();
     }
 
+    /**
+     * @param idx Minimum raw segment index that should be preserved from 
deletion.
+     */
+    void keepUncompressedIdxFrom(long idx) {
+        minUncompressedIdxToKeep = idx;
+    }
+
+    /**
+     * @return  Minimum raw segment index that should be preserved from 
deletion.
+     */
+    long keepUncompressedIdxFrom() {
+        return minUncompressedIdxToKeep;
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/036bd074/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsReserveWalSegmentsWithCompactionTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsReserveWalSegmentsWithCompactionTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsReserveWalSegmentsWithCompactionTest.java
new file mode 100644
index 0000000..bc34f29
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsReserveWalSegmentsWithCompactionTest.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db;
+
+import org.apache.ignite.configuration.IgniteConfiguration;
+
+/**
+ *
+ */
+public class IgnitePdsReserveWalSegmentsWithCompactionTest extends 
IgnitePdsReserveWalSegmentsTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        cfg.getDataStorageConfiguration().setWalCompactionEnabled(true);
+
+        return cfg;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/036bd074/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
index 811a231..df89419 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java
@@ -68,7 +68,7 @@ public class NoOpWALManager implements 
IgniteWriteAheadLogManager {
     }
 
     /** {@inheritDoc} */
-    @Override public boolean reserve(WALPointer start) throws 
IgniteCheckedException {
+    @Override public boolean reserve(WALPointer start) {
         return false;
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/036bd074/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
index 8287684..7840b09 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAwareTest.java
@@ -31,13 +31,12 @@ import static org.junit.Assert.assertThat;
  * Test for {@link SegmentAware}.
  */
 public class SegmentAwareTest extends TestCase {
-
     /**
      * Waiting finished when work segment is set.
      */
     public void testFinishAwaitSegment_WhenExactWaitingSegmentWasSet() throws 
IgniteCheckedException, InterruptedException {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10);
+        SegmentAware aware = new SegmentAware(10, false);
 
         IgniteInternalFuture future = awaitThread(() -> aware.awaitSegment(5));
 
@@ -53,7 +52,7 @@ public class SegmentAwareTest extends TestCase {
      */
     public void testFinishAwaitSegment_WhenGreaterThanWaitingSegmentWasSet() 
throws IgniteCheckedException, InterruptedException {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10);
+        SegmentAware aware = new SegmentAware(10, false);
 
         IgniteInternalFuture future = awaitThread(() -> aware.awaitSegment(5));
 
@@ -69,7 +68,7 @@ public class SegmentAwareTest extends TestCase {
      */
     public void testFinishAwaitSegment_WhenNextSegmentEqualToWaitingOne() 
throws IgniteCheckedException, InterruptedException {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10);
+        SegmentAware aware = new SegmentAware(10, false);
 
         IgniteInternalFuture future = awaitThread(() -> aware.awaitSegment(5));
 
@@ -91,7 +90,7 @@ public class SegmentAwareTest extends TestCase {
      */
     public void testFinishAwaitSegment_WhenInterruptWasCall() throws 
IgniteCheckedException, InterruptedException {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10);
+        SegmentAware aware = new SegmentAware(10, false);
 
         IgniteInternalFuture future = awaitThread(() -> aware.awaitSegment(5));
 
@@ -107,7 +106,7 @@ public class SegmentAwareTest extends TestCase {
      */
     public void testFinishWaitSegmentForArchive_WhenWorkSegmentIncremented() 
throws IgniteCheckedException, InterruptedException {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10);
+        SegmentAware aware = new SegmentAware(10, false);
 
         aware.curAbsWalIdx(5);
         aware.setLastArchivedAbsoluteIndex(4);
@@ -126,7 +125,7 @@ public class SegmentAwareTest extends TestCase {
      */
     public void testFinishWaitSegmentForArchive_WhenWorkSegmentGreaterValue() 
throws IgniteCheckedException, InterruptedException {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10);
+        SegmentAware aware = new SegmentAware(10, false);
 
         aware.curAbsWalIdx(5);
         aware.setLastArchivedAbsoluteIndex(4);
@@ -145,7 +144,7 @@ public class SegmentAwareTest extends TestCase {
      */
     public void testFinishWaitSegmentForArchive_WhenInterruptWasCall() throws 
IgniteCheckedException, InterruptedException {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10);
+        SegmentAware aware = new SegmentAware(10, false);
 
         aware.curAbsWalIdx(5);
         aware.setLastArchivedAbsoluteIndex(4);
@@ -164,7 +163,7 @@ public class SegmentAwareTest extends TestCase {
      */
     public void testCorrectCalculateNextSegmentIndex() throws 
IgniteCheckedException, InterruptedException {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10);
+        SegmentAware aware = new SegmentAware(10, false);
 
         aware.curAbsWalIdx(5);
 
@@ -180,7 +179,7 @@ public class SegmentAwareTest extends TestCase {
      */
     public void 
testFinishWaitNextAbsoluteIndex_WhenMarkAsArchivedFirstSegment() throws 
IgniteCheckedException, InterruptedException {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(2);
+        SegmentAware aware = new SegmentAware(2, false);
 
         aware.curAbsWalIdx(1);
         aware.setLastArchivedAbsoluteIndex(-1);
@@ -199,7 +198,7 @@ public class SegmentAwareTest extends TestCase {
      */
     public void testFinishWaitNextAbsoluteIndex_WhenSetToArchivedFirst() 
throws IgniteCheckedException, InterruptedException {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(2);
+        SegmentAware aware = new SegmentAware(2, false);
 
         aware.curAbsWalIdx(1);
         aware.setLastArchivedAbsoluteIndex(-1);
@@ -218,7 +217,7 @@ public class SegmentAwareTest extends TestCase {
      */
     public void 
testFinishWaitNextAbsoluteIndex_WhenOnlyForceInterruptWasCall() throws 
IgniteCheckedException, InterruptedException {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(2);
+        SegmentAware aware = new SegmentAware(2, false);
 
         aware.curAbsWalIdx(2);
         aware.setLastArchivedAbsoluteIndex(-1);
@@ -243,7 +242,7 @@ public class SegmentAwareTest extends TestCase {
      */
     public void testFinishSegmentArchived_WhenSetExactWaitingSegment() throws 
IgniteCheckedException, InterruptedException {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10);
+        SegmentAware aware = new SegmentAware(10, false);
 
         IgniteInternalFuture future = awaitThread(() -> 
aware.awaitSegmentArchived(5));
 
@@ -257,9 +256,9 @@ public class SegmentAwareTest extends TestCase {
     /**
      * Waiting finished when segment archived.
      */
-    public void testFinishSegmentArchived_WhenMarkExactWatingSegment() throws 
IgniteCheckedException, InterruptedException {
+    public void testFinishSegmentArchived_WhenMarkExactWaitingSegment() throws 
IgniteCheckedException, InterruptedException {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10);
+        SegmentAware aware = new SegmentAware(10, false);
 
         IgniteInternalFuture future = awaitThread(() -> 
aware.awaitSegmentArchived(5));
 
@@ -273,9 +272,9 @@ public class SegmentAwareTest extends TestCase {
     /**
      * Waiting finished when segment archived.
      */
-    public void testFinishSegmentArchived_WhenSetGreaterThanWatingSegment() 
throws IgniteCheckedException, InterruptedException {
+    public void testFinishSegmentArchived_WhenSetGreaterThanWaitingSegment() 
throws IgniteCheckedException, InterruptedException {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10);
+        SegmentAware aware = new SegmentAware(10, false);
 
         IgniteInternalFuture future = awaitThread(() -> 
aware.awaitSegmentArchived(5));
 
@@ -289,9 +288,9 @@ public class SegmentAwareTest extends TestCase {
     /**
      * Waiting finished when segment archived.
      */
-    public void testFinishSegmentArchived_WhenMarkGreaterThanWatingSegment() 
throws IgniteCheckedException, InterruptedException {
+    public void testFinishSegmentArchived_WhenMarkGreaterThanWaitingSegment() 
throws IgniteCheckedException, InterruptedException {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10);
+        SegmentAware aware = new SegmentAware(10, false);
 
         IgniteInternalFuture future = awaitThread(() -> 
aware.awaitSegmentArchived(5));
 
@@ -307,7 +306,7 @@ public class SegmentAwareTest extends TestCase {
      */
     public void testFinishSegmentArchived_WhenInterruptWasCall() throws 
IgniteCheckedException, InterruptedException {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10);
+        SegmentAware aware = new SegmentAware(10, false);
 
         aware.curAbsWalIdx(5);
         aware.setLastArchivedAbsoluteIndex(4);
@@ -326,7 +325,7 @@ public class SegmentAwareTest extends TestCase {
      */
     public void testMarkAsMovedToArchive_WhenReleaseLockedSegment() throws 
IgniteCheckedException, InterruptedException {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10);
+        SegmentAware aware = new SegmentAware(10, false);
 
         aware.checkCanReadArchiveOrReserveWorkSegment(5);
 
@@ -344,7 +343,7 @@ public class SegmentAwareTest extends TestCase {
      */
     public void testMarkAsMovedToArchive_WhenInterruptWasCall() throws 
IgniteCheckedException, InterruptedException {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10);
+        SegmentAware aware = new SegmentAware(10, false);
         aware.checkCanReadArchiveOrReserveWorkSegment(5);
 
         IgniteInternalFuture future = awaitThread(() -> 
aware.markAsMovedToArchive(5));
@@ -364,9 +363,9 @@ public class SegmentAwareTest extends TestCase {
      */
     public void testFinishWaitSegmentToCompress_WhenSetLastArchivedSegment() 
throws IgniteCheckedException, InterruptedException {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10);
+        SegmentAware aware = new SegmentAware(10, true);
 
-        aware.lastCompressedIdx(5);
+        aware.onSegmentCompressed(5);
 
         IgniteInternalFuture future = 
awaitThread(aware::waitNextSegmentToCompress);
 
@@ -382,9 +381,9 @@ public class SegmentAwareTest extends TestCase {
      */
     public void testFinishWaitSegmentToCompress_WhenMarkLastArchivedSegment() 
throws IgniteCheckedException, InterruptedException {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10);
+        SegmentAware aware = new SegmentAware(10, true);
 
-        aware.lastCompressedIdx(5);
+        aware.onSegmentCompressed(5);
 
         IgniteInternalFuture future = 
awaitThread(aware::waitNextSegmentToCompress);
 
@@ -400,9 +399,9 @@ public class SegmentAwareTest extends TestCase {
      */
     public void testCorrectCalculateNextCompressSegment() throws 
IgniteCheckedException, InterruptedException {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10);
+        SegmentAware aware = new SegmentAware(10, true);
 
-        aware.lastCompressedIdx(5);
+        aware.onSegmentCompressed(5);
         aware.setLastArchivedAbsoluteIndex(6);
         aware.lastTruncatedArchiveIdx(7);
 
@@ -418,8 +417,8 @@ public class SegmentAwareTest extends TestCase {
      */
     public void testFinishWaitSegmentToCompress_WhenInterruptWasCall() throws 
IgniteCheckedException, InterruptedException {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10);
-        aware.lastCompressedIdx(5);
+        SegmentAware aware = new SegmentAware(10, true);
+        aware.onSegmentCompressed(5);
 
         IgniteInternalFuture future = 
awaitThread(aware::waitNextSegmentToCompress);
 
@@ -431,11 +430,34 @@ public class SegmentAwareTest extends TestCase {
     }
 
     /**
+     * Tests that {@link SegmentAware#onSegmentCompressed} returns segments in 
proper order.
+     */
+    public void testLastCompressedIdxProperOrdering() throws 
IgniteInterruptedCheckedException {
+        SegmentAware aware = new SegmentAware(10, true);
+
+        for (int i = 0; i < 5 ; i++) {
+            aware.setLastArchivedAbsoluteIndex(i);
+            aware.waitNextSegmentToCompress();
+        }
+
+        aware.onSegmentCompressed(0);
+
+        aware.onSegmentCompressed(4);
+        assertEquals(0, aware.lastCompressedIdx());
+        aware.onSegmentCompressed(1);
+        assertEquals(1, aware.lastCompressedIdx());
+        aware.onSegmentCompressed(3);
+        assertEquals(1, aware.lastCompressedIdx());
+        aware.onSegmentCompressed(2);
+        assertEquals(4, aware.lastCompressedIdx());
+    }
+
+    /**
      * Segment reserve correctly.
      */
     public void testReserveCorrectly() {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10);
+        SegmentAware aware = new SegmentAware(10, false);
 
         //when: reserve one segment twice and one segment once.
         aware.reserve(5);
@@ -478,7 +500,7 @@ public class SegmentAwareTest extends TestCase {
      */
     public void testAssertFail_WhenReleaseUnreservedSegment() {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10);
+        SegmentAware aware = new SegmentAware(10, false);
 
         aware.reserve(5);
         try {
@@ -497,7 +519,7 @@ public class SegmentAwareTest extends TestCase {
      */
     public void testReserveWorkSegmentCorrectly() {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10);
+        SegmentAware aware = new SegmentAware(10, false);
 
         //when: lock one segment twice.
         aware.checkCanReadArchiveOrReserveWorkSegment(5);
@@ -530,7 +552,7 @@ public class SegmentAwareTest extends TestCase {
      */
     public void testAssertFail_WhenReleaseUnreservedWorkSegment() {
         //given: thread which awaited segment.
-        SegmentAware aware = new SegmentAware(10);
+        SegmentAware aware = new SegmentAware(10, false);
 
         aware.checkCanReadArchiveOrReserveWorkSegment(5);
         try {

http://git-wip-us.apache.org/repos/asf/ignite/blob/036bd074/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index a9f2601..7631834 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -34,6 +34,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.baseline.IgniteOf
 import 
org.apache.ignite.internal.processors.cache.persistence.baseline.IgniteOnlineNodeOutOfBaselineFullApiSelfTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsRebalancingOnNotStableTopologyTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsReserveWalSegmentsTest;
+import 
org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsReserveWalSegmentsWithCompactionTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsWholeClusterRestartTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.SlowHistoricalRebalanceSmallHistoryTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.checkpoint.IgniteCheckpointDirtyPagesForLowLoadTest;
@@ -156,6 +157,7 @@ public class IgnitePdsTestSuite2 extends TestSuite {
         suite.addTestSuite(IgnitePdsExchangeDuringCheckpointTest.class);
 
         suite.addTestSuite(IgnitePdsReserveWalSegmentsTest.class);
+        
suite.addTestSuite(IgnitePdsReserveWalSegmentsWithCompactionTest.class);
 
         // new style folders with generated consistent ID test
         suite.addTestSuite(IgniteUidAsConsistentIdMigrationTest.class);

Reply via email to