http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
index 3d0b8b9..d7047cf 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
@@ -91,6 +91,10 @@ import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStor
 import 
org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentFileInputFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.io.SimpleSegmentFileInputFactory;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory;
@@ -260,6 +264,9 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
     /** Factory to provide I/O interfaces for read/write operations with files 
*/
     private volatile FileIOFactory ioFactory;
 
+    /** Factory to provide I/O interfaces for read primitives with files */
+    private final SegmentFileInputFactory segmentFileInputFactory;
+
     /** Updater for {@link #currentHnd}, used for verify there are no 
concurrent update for current log segment handle */
     private static final 
AtomicReferenceFieldUpdater<FsyncModeFileWriteAheadLogManager, FileWriteHandle> 
currentHndUpd =
         
AtomicReferenceFieldUpdater.newUpdater(FsyncModeFileWriteAheadLogManager.class, 
FileWriteHandle.class, "currentHnd");
@@ -342,6 +349,7 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
         fsyncDelay = dsCfg.getWalFsyncDelayNanos();
         alwaysWriteFullPages = dsCfg.isAlwaysWriteFullPages();
         ioFactory = dsCfg.getFileIOFactory();
+        segmentFileInputFactory = new SimpleSegmentFileInputFactory();
         walAutoArchiveAfterInactivity = 
dsCfg.getWalAutoArchiveAfterInactivity();
         evt = ctx.event();
 
@@ -769,7 +777,8 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
             ioFactory,
             archiver,
             decompressor,
-            log
+            log,
+            segmentFileInputFactory
         );
     }
 
@@ -831,7 +840,7 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
 
         FileWriteHandle cur = currentHnd;
 
-        return cur != null && cur.idx >= absIdx;
+        return cur != null && cur.getSegmentId() >= absIdx;
     }
 
     /** {@inheritDoc} */
@@ -1119,9 +1128,9 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
             if (metrics.metricsEnabled())
                 metrics.onWallRollOver();
 
-            FileWriteHandle next = initNextWriteHandle(cur.idx);
+            FileWriteHandle next = initNextWriteHandle(cur.getSegmentId());
 
-            if (next.idx - lashCheckpointFileIdx() >= 
maxSegCountWithoutCheckpoint)
+            if (next.getSegmentId() - lashCheckpointFileIdx() >= 
maxSegCountWithoutCheckpoint)
                 cctx.database().forceCheckpoint("too big size of WAL without 
checkpoint");
 
             boolean swapped = currentHndUpd.compareAndSet(this, hnd, next);
@@ -1165,7 +1174,7 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
         int len = lastReadPtr == null ? 0 : lastReadPtr.length();
 
         try {
-            FileIO fileIO = ioFactory.create(curFile);
+            SegmentIO fileIO = new SegmentIO(absIdx, 
ioFactory.create(curFile));
 
             try {
                 int serVer = serializerVersion;
@@ -1173,7 +1182,7 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
                 // If we have existing segment, try to read version from it.
                 if (lastReadPtr != null) {
                     try {
-                        serVer = readSegmentHeader(fileIO, 
absIdx).getSerializerVersion();
+                        serVer = readSegmentHeader(fileIO, 
segmentFileInputFactory).getSerializerVersion();
                     }
                     catch (SegmentEofException | EOFException ignore) {
                         serVer = serializerVersion;
@@ -1188,7 +1197,6 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
 
                 FileWriteHandle hnd = new FileWriteHandle(
                     fileIO,
-                    absIdx,
                     offset + len,
                     maxWalSegmentSize,
                     ser);
@@ -1238,11 +1246,10 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
             if (log.isDebugEnabled())
                 log.debug("Switching to a new WAL segment: " + 
nextFile.getAbsolutePath());
 
-            FileIO fileIO = ioFactory.create(nextFile);
+            SegmentIO fileIO = new SegmentIO(curIdx + 1, 
ioFactory.create(nextFile));
 
             FileWriteHandle hnd = new FileWriteHandle(
                 fileIO,
-                curIdx + 1,
                 0,
                 maxWalSegmentSize,
                 serializer);
@@ -1886,7 +1893,7 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
             FileDescriptor[] alreadyCompressed = 
scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_COMPACTED_FILTER));
 
             if (alreadyCompressed.length > 0)
-                lastCompressedIdx = alreadyCompressed[alreadyCompressed.length 
- 1].getIdx();
+                lastCompressedIdx = alreadyCompressed[alreadyCompressed.length 
- 1].idx();
         }
 
         /**
@@ -2039,7 +2046,7 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
             int segmentSerializerVer;
 
             try (FileIO fileIO = ioFactory.create(raw)) {
-                segmentSerializerVer = readSegmentHeader(fileIO, 
nextSegment).getSerializerVersion();
+                segmentSerializerVer = readSegmentHeader(new 
SegmentIO(nextSegment, fileIO), segmentFileInputFactory).getSerializerVersion();
             }
 
             try (ZipOutputStream zos = new ZipOutputStream(new 
BufferedOutputStream(new FileOutputStream(zip)))) {
@@ -2326,18 +2333,20 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
      */
     private abstract static class FileHandle {
         /** I/O interface for read/write operations with file */
-        protected FileIO fileIO;
-
-        /** Absolute WAL segment file index (incremental counter) */
-        protected final long idx;
+        protected SegmentIO fileIO;
 
         /**
          * @param fileIO I/O interface for read/write operations of FileHandle.
-         * @param idx Absolute WAL segment file index (incremental counter).
          */
-        private FileHandle(FileIO fileIO, long idx) {
+        private FileHandle(SegmentIO fileIO) {
             this.fileIO = fileIO;
-            this.idx = idx;
+        }
+
+        /**
+         * @return Current segment id.
+         */
+        public long getSegmentId(){
+            return fileIO.getSegmentId();
         }
     }
 
@@ -2359,17 +2368,15 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
 
         /**
          * @param fileIO I/O interface for read/write operations of FileHandle.
-         * @param idx Absolute WAL segment file index (incremental counter).
          * @param ser Entry serializer.
          * @param in File input.
          */
         ReadFileHandle(
-                FileIO fileIO,
-                long idx,
-                RecordSerializer ser,
-                FileInput in
+            SegmentIO fileIO,
+            RecordSerializer ser,
+            FileInput in
         ) {
-            super(fileIO, idx);
+            super(fileIO);
 
             this.ser = ser;
             this.in = in;
@@ -2389,7 +2396,7 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
 
         /** {@inheritDoc} */
         @Override public long idx() {
-            return idx;
+            return getSegmentId();
         }
 
         /** {@inheritDoc} */
@@ -2456,20 +2463,18 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
 
         /**
          * @param fileIO I/O file interface to use
-         * @param idx Absolute WAL segment file index for easy access.
          * @param pos Position.
          * @param maxSegmentSize Max segment size.
          * @param serializer Serializer.
          * @throws IOException If failed.
          */
         private FileWriteHandle(
-            FileIO fileIO,
-            long idx,
+            SegmentIO fileIO,
             long pos,
             long maxSegmentSize,
             RecordSerializer serializer
         ) throws IOException {
-            super(fileIO, idx);
+            super(fileIO);
 
             assert serializer != null;
 
@@ -2478,7 +2483,7 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
             this.maxSegmentSize = maxSegmentSize;
             this.serializer = serializer;
 
-            head.set(new FakeRecord(new FileWALPointer(idx, (int)pos, 0), 
false));
+            head.set(new FakeRecord(new FileWALPointer(fileIO.getSegmentId(), 
(int)pos, 0), false));
             written = pos;
             lastFsyncPos = pos;
         }
@@ -2494,15 +2499,15 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
                 assert fileIO.position() == 0 : "Serializer version can be 
written only at the begin of file " +
                     fileIO.position();
 
-                long updatedPosition = 
FsyncModeFileWriteAheadLogManager.writeSerializerVersion(fileIO, idx,
+                long updatedPosition = 
FsyncModeFileWriteAheadLogManager.writeSerializerVersion(fileIO, getSegmentId(),
                     serializer.version(), mode);
 
                 written = updatedPosition;
                 lastFsyncPos = updatedPosition;
-                head.set(new FakeRecord(new FileWALPointer(idx, 
(int)updatedPosition, 0), false));
+                head.set(new FakeRecord(new FileWALPointer(getSegmentId(), 
(int)updatedPosition, 0), false));
             }
             catch (IOException e) {
-                throw new IOException("Unable to write serializer version for 
segment " + idx, e);
+                throw new IOException("Unable to write serializer version for 
segment " + getSegmentId(), e);
             }
         }
 
@@ -2558,7 +2563,7 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
                 rec.previous(h);
 
                 FileWALPointer ptr = new FileWALPointer(
-                    idx,
+                    getSegmentId(),
                     (int)nextPos,
                     rec.size());
 
@@ -2588,7 +2593,7 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
 
             if (ptr != null) {
                 // If requested obsolete file index, it must be already 
flushed by close.
-                if (ptr.index() != idx)
+                if (ptr.index() != getSegmentId())
                     return;
 
                 expWritten = ptr.fileOffset();
@@ -2647,7 +2652,7 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
                 }
             }
 
-            assert ptr.index() == idx;
+            assert ptr.index() == getSegmentId();
 
             for (; ; ) {
                 WALRecord h = head.get();
@@ -2684,7 +2689,7 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
             // Fail-fast before CAS.
             checkNode();
 
-            if (!head.compareAndSet(expHead, new FakeRecord(new 
FileWALPointer(idx, (int)nextPosition(expHead), 0), stop)))
+            if (!head.compareAndSet(expHead, new FakeRecord(new 
FileWALPointer(getSegmentId(), (int)nextPosition(expHead), 0), stop)))
                 return false;
 
             if (expHead.chainSize() == 0)
@@ -2782,7 +2787,7 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
             // If index has changed, it means that the log was rolled over and 
already sync'ed.
             // If requested position is smaller than last sync'ed, it also 
means all is good.
             // If position is equal, then our record is the last not synced.
-            return idx == ptr.index() && lastFsyncPos <= ptr.fileOffset();
+            return getSegmentId() == ptr.index() && lastFsyncPos <= 
ptr.fileOffset();
         }
 
         /**
@@ -2792,7 +2797,7 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
             lock.lock();
 
             try {
-                return new FileWALPointer(idx, (int)written, 0);
+                return new FileWALPointer(getSegmentId(), (int)written, 0);
             }
             finally {
                 lock.unlock();
@@ -2881,7 +2886,7 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
                             if (rollOver && written < (maxSegmentSize - 
switchSegmentRecSize)) {
                                 final ByteBuffer buf = 
ByteBuffer.allocate(switchSegmentRecSize);
 
-                                segmentRecord.position(new FileWALPointer(idx, 
(int)written, switchSegmentRecSize));
+                                segmentRecord.position(new 
FileWALPointer(getSegmentId(), (int)written, switchSegmentRecSize));
                                 backwardSerializer.writeRecord(segmentRecord, 
buf);
 
                                 buf.rewind();
@@ -2904,11 +2909,11 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
                         }
                     }
                     catch (IOException e) {
-                        throw new StorageException("Failed to close WAL write 
handle [idx=" + idx + "]", e);
+                        throw new StorageException("Failed to close WAL write 
handle [idx=" + getSegmentId() + "]", e);
                     }
 
                     if (log.isDebugEnabled())
-                        log.debug("Closed WAL write handle [idx=" + idx + "]");
+                        log.debug("Closed WAL write handle [idx=" + 
getSegmentId() + "]");
 
                     return true;
                 }
@@ -2943,7 +2948,7 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
                         fileIO.close();
                     }
                     catch (IOException e) {
-                        U.error(log, "Failed to close WAL file [idx=" + idx + 
", fileIO=" + fileIO + "]", e);
+                        U.error(log, "Failed to close WAL file [idx=" + 
getSegmentId() + ", fileIO=" + fileIO + "]", e);
                     }
                 }
 
@@ -3160,7 +3165,8 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
          * @param serializerFactory Serializer factory.
          * @param archiver Archiver.
          * @param decompressor Decompressor.
-         *@param log Logger  @throws IgniteCheckedException If failed to 
initialize WAL segment.
+         * @param log Logger  @throws IgniteCheckedException If failed to 
initialize WAL segment.
+         * @param segmentFileInputFactory Segment file input factory.
          */
         private RecordsIterator(
             GridCacheSharedContext cctx,
@@ -3173,13 +3179,17 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
             FileIOFactory ioFactory,
             FileArchiver archiver,
             FileDecompressor decompressor,
-            IgniteLogger log
+            IgniteLogger log,
+            SegmentFileInputFactory segmentFileInputFactory
         ) throws IgniteCheckedException {
-            super(log,
+            super(
+                log,
                 cctx,
                 serializerFactory,
                 ioFactory,
-                psCfg.getWalRecordIteratorBufferSize());
+                psCfg.getWalRecordIteratorBufferSize(),
+                segmentFileInputFactory
+            );
             this.walWorkDir = walWorkDir;
             this.walArchiveDir = walArchiveDir;
             this.psCfg = psCfg;
@@ -3401,9 +3411,9 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
         }
 
         /** {@inheritDoc} */
-        @Override protected AbstractReadFileHandle createReadFileHandle(FileIO 
fileIO, long idx,
+        @Override protected AbstractReadFileHandle 
createReadFileHandle(SegmentIO fileIO,
             RecordSerializer ser, FileInput in) {
-            return new ReadFileHandle(fileIO, idx, ser, in);
+            return new ReadFileHandle(fileIO, ser, in);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentArchivedMonitor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentArchivedMonitor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentArchivedMonitor.java
deleted file mode 100644
index 81ecd41..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentArchivedMonitor.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ignite.internal.processors.cache.persistence.wal;
-
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
-
-/**
- * Next WAL segment archived monitor. Manages last archived index, allows to 
emulate archivation in no-archiver mode.
- * Monitor which is notified each time WAL segment is archived.
- */
-class SegmentArchivedMonitor {
-    /**
-     * Last archived file absolute index, 0-based. Write is quarded by {@code 
this}. Negative value indicates there are
-     * no segments archived.
-     */
-    private volatile long lastAbsArchivedIdx = -1;
-
-    /**
-     * @return Last archived segment absolute index.
-     */
-    long lastArchivedAbsoluteIndex() {
-        return lastAbsArchivedIdx;
-    }
-
-    /**
-     * @param lastAbsArchivedIdx new value of last archived segment index
-     */
-    synchronized void setLastArchivedAbsoluteIndex(long lastAbsArchivedIdx) {
-        this.lastAbsArchivedIdx = lastAbsArchivedIdx;
-
-        notifyAll();
-    }
-
-    /**
-     * Method will wait activation of particular WAL segment index.
-     *
-     * @param awaitIdx absolute index  {@link #lastArchivedAbsoluteIndex()} to 
become true.
-     * @throws IgniteInterruptedCheckedException if interrupted.
-     */
-    synchronized void awaitSegmentArchived(long awaitIdx) throws 
IgniteInterruptedCheckedException {
-        while (lastArchivedAbsoluteIndex() < awaitIdx) {
-            try {
-                wait(2000);
-            }
-            catch (InterruptedException e) {
-                throw new IgniteInterruptedCheckedException(e);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentReservationStorage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentReservationStorage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentReservationStorage.java
deleted file mode 100644
index 12c4b4f..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentReservationStorage.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ignite.internal.processors.cache.persistence.wal;
-
-import java.util.NavigableMap;
-import java.util.TreeMap;
-
-/**
- * Segment reservations storage: Protects WAL segments from deletion during 
WAL log cleanup.
- */
-class SegmentReservationStorage {
-    /**
-     * Maps absolute segment index to reservation counter. If counter > 0 then 
we wouldn't delete all segments
-     * which has index >= reserved segment index. Guarded by {@code this}.
-     */
-    private NavigableMap<Long, Integer> reserved = new TreeMap<>();
-
-    /**
-     * @param absIdx Index for reservation.
-     */
-    synchronized void reserve(long absIdx) {
-        reserved.merge(absIdx, 1, (a, b) -> a + b);
-    }
-
-    /**
-     * Checks if segment is currently reserved (protected from deletion during 
WAL cleanup).
-     * @param absIdx Index for check reservation.
-     * @return {@code True} if index is reserved.
-     */
-    synchronized boolean reserved(long absIdx) {
-        return reserved.floorKey(absIdx) != null;
-    }
-
-    /**
-     * @param absIdx Reserved index.
-     */
-    synchronized void release(long absIdx) {
-        Integer cur = reserved.get(absIdx);
-
-        assert cur != null && cur >= 1 : "cur=" + cur + ", absIdx=" + absIdx;
-
-        if (cur == 1)
-            reserved.remove(absIdx);
-        else
-            reserved.put(absIdx, cur - 1);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentRouter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentRouter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentRouter.java
new file mode 100644
index 0000000..2d47e9d
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentRouter.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentAware;
+
+import static 
org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor.fileName;
+
+/**
+ * Class for manage of segment file location.
+ */
+public class SegmentRouter {
+    /** */
+    public static final String ZIP_SUFFIX = ".zip";
+    /** */
+    private File walWorkDir;
+
+    /** WAL archive directory (including consistent ID as subfolder) */
+    private File walArchiveDir;
+
+    /** Holder of actual information of latest manipulation on WAL segments. */
+    private SegmentAware segmentAware;
+
+    /** */
+    private DataStorageConfiguration dsCfg;
+
+    /**
+     * @param walWorkDir WAL work directory.
+     * @param walArchiveDir WAL archive directory.
+     * @param segmentAware Holder of actual information of latest manipulation 
on WAL segments.
+     * @param dsCfg Data storage configuration.
+     */
+    public SegmentRouter(
+        File walWorkDir,
+        File walArchiveDir,
+        SegmentAware segmentAware,
+        DataStorageConfiguration dsCfg) {
+        this.walWorkDir = walWorkDir;
+        this.walArchiveDir = walArchiveDir;
+        this.segmentAware = segmentAware;
+        this.dsCfg = dsCfg;
+    }
+
+    /**
+     * Find file which represent given segment.
+     *
+     * @param segmentId Segment for searching.
+     * @return Actual file description.
+     * @throws FileNotFoundException If file does not exist.
+     */
+    public FileDescriptor findSegment(long segmentId) throws 
FileNotFoundException {
+        FileDescriptor fd;
+
+        if (segmentAware.lastArchivedAbsoluteIndex() >= segmentId)
+            fd = new FileDescriptor(new File(walArchiveDir, 
fileName(segmentId)));
+        else
+            fd = new FileDescriptor(new File(walWorkDir, fileName(segmentId % 
dsCfg.getWalSegments())), segmentId);
+
+        if (!fd.file().exists()) {
+            FileDescriptor zipFile = new FileDescriptor(new 
File(walArchiveDir, fileName(fd.idx()) + ZIP_SUFFIX));
+
+            if (!zipFile.file().exists()) {
+                throw new FileNotFoundException("Both compressed and raw 
segment files are missing in archive " +
+                    "[segmentIdx=" + fd.idx() + "]");
+            }
+
+            fd = zipFile;
+        }
+
+        return fd;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java
index a42eb89..8d1445c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SingleSegmentLogicalRecordsIterator.java
@@ -25,8 +25,10 @@ import org.apache.ignite.internal.pagemem.wal.WALPointer;
 import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.io.SimpleSegmentFileInputFactory;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.record.RecordTypes;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory;
@@ -71,7 +73,7 @@ public class SingleSegmentLogicalRecordsIterator extends 
AbstractWalRecordsItera
         File archiveDir,
         CIX1<WALRecord> advanceC
     ) throws IgniteCheckedException {
-        super(log, sharedCtx, initLogicalRecordsSerializerFactory(sharedCtx), 
ioFactory, bufSize);
+        super(log, sharedCtx, initLogicalRecordsSerializerFactory(sharedCtx), 
ioFactory, bufSize, new SimpleSegmentFileInputFactory());
 
         curWalSegmIdx = archivedSegIdx;
         this.archiveDir = archiveDir;
@@ -121,9 +123,10 @@ public class SingleSegmentLogicalRecordsIterator extends 
AbstractWalRecordsItera
     }
 
     /** {@inheritDoc} */
-    @Override protected AbstractReadFileHandle createReadFileHandle(FileIO 
fileIO, long idx,
+    @Override protected AbstractReadFileHandle createReadFileHandle(
+        SegmentIO fileIO,
         RecordSerializer ser, FileInput in) {
-        return new FileWriteAheadLogManager.ReadFileHandle(fileIO, idx, ser, 
in);
+        return new FileWriteAheadLogManager.ReadFileHandle(fileIO, ser, in, 
null);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java
new file mode 100644
index 0000000..1ed607e
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentArchivedStorage.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.persistence.wal.aware;
+
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+
+/**
+ * Manages last archived index, allows to emulate archivation in no-archiver 
mode. Monitor which is notified each time
+ * WAL segment is archived.
+ *
+ * Class for inner usage.
+ */
+class SegmentArchivedStorage extends SegmentObservable {
+    /** Segment lock storage: Protects WAL work segments from moving. */
+    private final SegmentLockStorage segmentLockStorage;
+    /** Flag of interrupt waiting on this object. */
+    private volatile boolean interrupted;
+    /**
+     * Last archived file absolute index, 0-based. Write is quarded by {@code 
this}. Negative value indicates there are
+     * no segments archived.
+     */
+    private volatile long lastAbsArchivedIdx = -1;
+
+    /**
+     * @param segmentLockStorage Protects WAL work segments from moving.
+     */
+    private SegmentArchivedStorage(SegmentLockStorage segmentLockStorage) {
+        this.segmentLockStorage = segmentLockStorage;
+    }
+
+    /**
+     * @param segmentLockStorage Protects WAL work segments from moving.
+     */
+    static SegmentArchivedStorage buildArchivedStorage(SegmentLockStorage 
segmentLockStorage) {
+        SegmentArchivedStorage archivedStorage = new 
SegmentArchivedStorage(segmentLockStorage);
+
+        segmentLockStorage.addObserver(archivedStorage::onSegmentUnlocked);
+
+        return archivedStorage;
+    }
+
+    /**
+     * @return Last archived segment absolute index.
+     */
+    long lastArchivedAbsoluteIndex() {
+        return lastAbsArchivedIdx;
+    }
+
+    /**
+     * @param lastAbsArchivedIdx New value of last archived segment index.
+     */
+    synchronized void setLastArchivedAbsoluteIndex(long lastAbsArchivedIdx) {
+        this.lastAbsArchivedIdx = lastAbsArchivedIdx;
+
+        notifyAll();
+
+        notifyObservers(lastAbsArchivedIdx);
+    }
+
+    /**
+     * Method will wait activation of particular WAL segment index.
+     *
+     * @param awaitIdx absolute index  {@link #lastArchivedAbsoluteIndex()} to 
become true.
+     * @throws IgniteInterruptedCheckedException if interrupted.
+     */
+    synchronized void awaitSegmentArchived(long awaitIdx) throws 
IgniteInterruptedCheckedException {
+        while (lastArchivedAbsoluteIndex() < awaitIdx && !interrupted) {
+            try {
+                wait(2000);
+            }
+            catch (InterruptedException e) {
+                throw new IgniteInterruptedCheckedException(e);
+            }
+        }
+
+        checkInterrupted();
+    }
+
+    /**
+     * Mark segment as moved to archive under lock.
+     *
+     * @param toArchive Segment which was should be moved to archive.
+     * @throws IgniteInterruptedCheckedException if interrupted during waiting.
+     */
+    synchronized void markAsMovedToArchive(long toArchive) throws 
IgniteInterruptedCheckedException {
+        try {
+            while (segmentLockStorage.locked(toArchive) && !interrupted)
+                wait();
+        }
+        catch (InterruptedException e) {
+            throw new IgniteInterruptedCheckedException(e);
+        }
+
+        //Ignore interrupted flag and force set new value. - legacy logic.
+        //checkInterrupted();
+
+        setLastArchivedAbsoluteIndex(toArchive);
+    }
+
+    /**
+     * Interrupt waiting on this object.
+     */
+    synchronized void interrupt() {
+        interrupted = true;
+
+        notifyAll();
+    }
+
+    /**
+     * Check for interrupt flag was set.
+     */
+    private void checkInterrupted() throws IgniteInterruptedCheckedException {
+        if (interrupted)
+            throw new IgniteInterruptedCheckedException("Interrupt waiting of 
change archived idx");
+    }
+
+    /**
+     * Callback for waking up waiters of this object when unlocked happened.
+     */
+    private synchronized void onSegmentUnlocked(long segmentId) {
+        notifyAll();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
new file mode 100644
index 0000000..3379b74
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentAware.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.aware;
+
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+
+import static 
org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentArchivedStorage.buildArchivedStorage;
+import static 
org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentCompressStorage.buildCompressStorage;
+import static 
org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentCurrentStateStorage.buildCurrentStateStorage;
+
+/**
+ * Holder of actual information of latest manipulation on WAL segments.
+ */
+public class SegmentAware {
+    /** Latest truncated segment. */
+    private volatile long lastTruncatedArchiveIdx = -1L;
+    /** Segment reservations storage: Protects WAL segments from deletion 
during WAL log cleanup. */
+    private final SegmentReservationStorage reservationStorage = new 
SegmentReservationStorage();
+    /** Lock on segment protects from archiving segment. */
+    private final SegmentLockStorage segmentLockStorage = new 
SegmentLockStorage();
+    /** Manages last archived index, emulates archivation in no-archiver mode. 
*/
+    private final SegmentArchivedStorage segmentArchivedStorage = 
buildArchivedStorage(segmentLockStorage);
+    /** Storage of actual information about current index of compressed 
segments. */
+    private final SegmentCompressStorage segmentCompressStorage = 
buildCompressStorage(segmentArchivedStorage);
+    /** Storage of absolute current segment index. */
+    private final SegmentCurrentStateStorage segmentCurrStateStorage;
+
+    /**
+     * @param walSegmentsCnt Total WAL segments count.
+     */
+    public SegmentAware(int walSegmentsCnt) {
+        segmentCurrStateStorage = buildCurrentStateStorage(walSegmentsCnt, 
segmentArchivedStorage);
+    }
+
+    /**
+     * Waiting until current WAL index will be greater or equal than given one.
+     *
+     * @param absSegIdx Target WAL index.
+     */
+    public void awaitSegment(long absSegIdx) throws 
IgniteInterruptedCheckedException {
+        segmentCurrStateStorage.awaitSegment(absSegIdx);
+    }
+
+    /**
+     * Calculate next segment index or wait if needed.
+     *
+     * @return Next absolute segment index.
+     */
+    public long nextAbsoluteSegmentIndex() throws 
IgniteInterruptedCheckedException {
+        return segmentCurrStateStorage.nextAbsoluteSegmentIndex();
+    }
+
+    /**
+     * @return Current WAL index.
+     */
+    public long curAbsWalIdx() {
+        return segmentCurrStateStorage.curAbsWalIdx();
+    }
+
+    /**
+     * Waiting until archivation of next segment will be allowed.
+     */
+    public long waitNextSegmentForArchivation() throws 
IgniteInterruptedCheckedException {
+        return segmentCurrStateStorage.waitNextSegmentForArchivation();
+    }
+
+    /**
+     * Mark segment as moved to archive under lock.
+     *
+     * @param toArchive Segment which was should be moved to archive.
+     * @throws IgniteInterruptedCheckedException if interrupted during waiting.
+     */
+    public void markAsMovedToArchive(long toArchive) throws 
IgniteInterruptedCheckedException {
+        segmentArchivedStorage.markAsMovedToArchive(toArchive);
+    }
+
+    /**
+     * Method will wait activation of particular WAL segment index.
+     *
+     * @param awaitIdx absolute index  {@link #lastArchivedAbsoluteIndex()} to 
become true.
+     * @throws IgniteInterruptedCheckedException if interrupted.
+     */
+    public void awaitSegmentArchived(long awaitIdx) throws 
IgniteInterruptedCheckedException {
+        segmentArchivedStorage.awaitSegmentArchived(awaitIdx);
+    }
+
+    /**
+     * Pessimistically tries to reserve segment for compression in order to 
avoid concurrent truncation. Waits if
+     * there's no segment to archive right now.
+     */
+    public long waitNextSegmentToCompress() throws 
IgniteInterruptedCheckedException {
+        return Math.max(segmentCompressStorage.nextSegmentToCompressOrWait(), 
lastTruncatedArchiveIdx + 1);
+    }
+
+    /**
+     * Force set last compressed segment.
+     *
+     * @param lastCompressedIdx Segment which was last compressed.
+     */
+    public void lastCompressedIdx(long lastCompressedIdx) {
+        segmentCompressStorage.lastCompressedIdx(lastCompressedIdx);
+    }
+
+    /**
+     * @return Last compressed segment.
+     */
+    public long lastCompressedIdx() {
+        return segmentCompressStorage.lastCompressedIdx();
+    }
+
+    /**
+     * Update current WAL index.
+     *
+     * @param curAbsWalIdx New current WAL index.
+     */
+    public void curAbsWalIdx(long curAbsWalIdx) {
+        segmentCurrStateStorage.curAbsWalIdx(curAbsWalIdx);
+    }
+
+    /**
+     * @param lastTruncatedArchiveIdx Last truncated segment;
+     */
+    public void lastTruncatedArchiveIdx(long lastTruncatedArchiveIdx) {
+        this.lastTruncatedArchiveIdx = lastTruncatedArchiveIdx;
+    }
+
+    /**
+     * @return Last truncated segment.
+     */
+    public long lastTruncatedArchiveIdx() {
+        return lastTruncatedArchiveIdx;
+    }
+
+    /**
+     * @param lastAbsArchivedIdx New value of last archived segment index.
+     */
+    public void setLastArchivedAbsoluteIndex(long lastAbsArchivedIdx) {
+        
segmentArchivedStorage.setLastArchivedAbsoluteIndex(lastAbsArchivedIdx);
+    }
+
+    /**
+     * @return Last archived segment absolute index.
+     */
+    public long lastArchivedAbsoluteIndex() {
+        return segmentArchivedStorage.lastArchivedAbsoluteIndex();
+    }
+
+    /**
+     * @param absIdx Index for reservation.
+     */
+    public void reserve(long absIdx) {
+        reservationStorage.reserve(absIdx);
+    }
+
+    /**
+     * Checks if segment is currently reserved (protected from deletion during 
WAL cleanup).
+     *
+     * @param absIdx Index for check reservation.
+     * @return {@code True} if index is reserved.
+     */
+    public boolean reserved(long absIdx) {
+        return reservationStorage.reserved(absIdx);
+    }
+
+    /**
+     * @param absIdx Reserved index.
+     */
+    public void release(long absIdx) {
+        reservationStorage.release(absIdx);
+    }
+
+    /**
+     * Check if WAL segment locked (protected from move to archive)
+     *
+     * @param absIdx Index for check reservation.
+     * @return {@code True} if index is locked.
+     */
+    public boolean locked(long absIdx) {
+        return segmentLockStorage.locked(absIdx);
+    }
+
+    /**
+     * @param absIdx Segment absolute index.
+     * @return <ul><li>{@code True} if can read, no lock is held, 
</li><li>{@code false} if work segment, need release
+     * segment later, use {@link #releaseWorkSegment} for unlock</li> </ul>
+     */
+    public boolean checkCanReadArchiveOrReserveWorkSegment(long absIdx) {
+        return lastArchivedAbsoluteIndex() >= absIdx || 
segmentLockStorage.lockWorkSegment(absIdx);
+    }
+
+    /**
+     * @param absIdx Segment absolute index.
+     */
+    public void releaseWorkSegment(long absIdx) {
+        segmentLockStorage.releaseWorkSegment(absIdx);
+    }
+
+    /**
+     * Interrupt waiting on related objects.
+     */
+    public void interrupt() {
+        segmentArchivedStorage.interrupt();
+
+        segmentCompressStorage.interrupt();
+
+        segmentCurrStateStorage.interrupt();
+    }
+
+    /**
+     * Interrupt waiting on related objects.
+     */
+    public void forceInterrupt() {
+        segmentArchivedStorage.interrupt();
+
+        segmentCompressStorage.interrupt();
+
+        segmentCurrStateStorage.forceInterrupt();
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
new file mode 100644
index 0000000..30c9a2d
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCompressStorage.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.aware;
+
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+
+/**
+ * Storage of actual information about current index of compressed segments.
+ */
+public class SegmentCompressStorage {
+    /** Flag of interrupt waiting on this object. */
+    private volatile boolean interrupted;
+    /** Manages last archived index, emulates archivation in no-archiver mode. 
*/
+    private final SegmentArchivedStorage segmentArchivedStorage;
+    /** Last successfully compressed segment. */
+    private volatile long lastCompressedIdx = -1L;
+
+    /**
+     * @param segmentArchivedStorage Storage of last archived segment.
+     */
+    private SegmentCompressStorage(SegmentArchivedStorage 
segmentArchivedStorage) {
+        this.segmentArchivedStorage = segmentArchivedStorage;
+
+        this.segmentArchivedStorage.addObserver(this::onSegmentArchived);
+    }
+
+    /**
+     * @param segmentArchivedStorage Storage of last archived segment.
+     */
+    static SegmentCompressStorage buildCompressStorage(SegmentArchivedStorage 
segmentArchivedStorage) {
+        SegmentCompressStorage storage = new 
SegmentCompressStorage(segmentArchivedStorage);
+
+        segmentArchivedStorage.addObserver(storage::onSegmentArchived);
+
+        return storage;
+    }
+
+    /**
+     * Force set last compressed segment.
+     *
+     * @param lastCompressedIdx Segment which was last compressed.
+     */
+    void lastCompressedIdx(long lastCompressedIdx) {
+        this.lastCompressedIdx = lastCompressedIdx;
+    }
+
+    /**
+     * @return Last compressed segment.
+     */
+    long lastCompressedIdx() {
+        return lastCompressedIdx;
+    }
+
+    /**
+     * Pessimistically tries to reserve segment for compression in order to 
avoid concurrent truncation. Waits if
+     * there's no segment to archive right now.
+     */
+    synchronized long nextSegmentToCompressOrWait() throws 
IgniteInterruptedCheckedException {
+        long segmentToCompress = lastCompressedIdx + 1;
+
+        try {
+            while (
+                segmentToCompress > 
segmentArchivedStorage.lastArchivedAbsoluteIndex()
+                    && !interrupted
+                )
+                wait();
+        }
+        catch (InterruptedException e) {
+            throw new IgniteInterruptedCheckedException(e);
+        }
+
+        checkInterrupted();
+
+        return segmentToCompress;
+    }
+
+    /**
+     * Interrupt waiting on this object.
+     */
+    synchronized void interrupt() {
+        interrupted = true;
+
+        notifyAll();
+    }
+
+    /**
+     * Check for interrupt flag was set.
+     */
+    private void checkInterrupted() throws IgniteInterruptedCheckedException {
+        if (interrupted)
+            throw new IgniteInterruptedCheckedException("Interrupt waiting of 
change compressed idx");
+    }
+
+    /**
+     * Callback for waking up compressor when new segment is archived.
+     */
+    private synchronized void onSegmentArchived(long lastAbsArchivedIdx) {
+        notifyAll();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCurrentStateStorage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCurrentStateStorage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCurrentStateStorage.java
new file mode 100644
index 0000000..5761ef9
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentCurrentStateStorage.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.aware;
+
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+
+/**
+ * Storage of absolute current segment index.
+ */
+class SegmentCurrentStateStorage {
+    /** Flag of interrupt of waiting on this object. */
+    private volatile boolean interrupted;
+    /** Flag of force interrupt of waiting on this object. Needed for 
uninterrupted waiters. */
+    private volatile boolean forceInterrupted;
+    /** Total WAL segments count. */
+    private final int walSegmentsCnt;
+    /** Manages last archived index, emulates archivation in no-archiver mode. 
*/
+    private final SegmentArchivedStorage segmentArchivedStorage;
+    /**
+     * Absolute current segment index WAL Manager writes to. Guarded by 
<code>this</code>. Incremented during rollover.
+     * Also may be directly set if WAL is resuming logging after start.
+     */
+    private volatile long curAbsWalIdx = -1;
+
+    /**
+     * @param walSegmentsCnt Total WAL segments count.
+     * @param segmentArchivedStorage Last archived segment storage.
+     */
+    private SegmentCurrentStateStorage(int walSegmentsCnt, 
SegmentArchivedStorage segmentArchivedStorage) {
+        this.walSegmentsCnt = walSegmentsCnt;
+        this.segmentArchivedStorage = segmentArchivedStorage;
+    }
+
+    /**
+     * @param walSegmentsCnt Total WAL segments count.
+     * @param segmentArchivedStorage Last archived segment storage.
+     */
+    static SegmentCurrentStateStorage buildCurrentStateStorage(
+        int walSegmentsCnt,
+        SegmentArchivedStorage segmentArchivedStorage
+    ) {
+
+        SegmentCurrentStateStorage currStorage = new 
SegmentCurrentStateStorage(walSegmentsCnt, segmentArchivedStorage);
+
+        segmentArchivedStorage.addObserver(currStorage::onSegmentArchived);
+
+        return currStorage;
+    }
+
+    /**
+     * Waiting until current WAL index will be greater or equal than given one.
+     *
+     * @param absSegIdx Target WAL index.
+     */
+    synchronized void awaitSegment(long absSegIdx) throws 
IgniteInterruptedCheckedException {
+        try {
+            while (curAbsWalIdx < absSegIdx && !interrupted)
+                wait();
+        }
+        catch (InterruptedException e) {
+            throw new IgniteInterruptedCheckedException(e);
+        }
+
+        checkInterrupted();
+    }
+
+    /**
+     * Waiting until archivation of next segment will be allowed.
+     */
+    synchronized long waitNextSegmentForArchivation() throws 
IgniteInterruptedCheckedException {
+        long lastArchivedSegment = 
segmentArchivedStorage.lastArchivedAbsoluteIndex();
+
+        //We can archive segment if it less than current work segment so for 
archivate lastArchiveSegment + 1
+        // we should be ensure that currentWorkSegment = lastArchiveSegment + 2
+        awaitSegment(lastArchivedSegment + 2);
+
+        return lastArchivedSegment + 1;
+    }
+
+    /**
+     * Calculate next segment index or wait if needed. Uninterrupted waiting. 
- for force interrupt used
+     * forceInterrupted flag.
+     *
+     * @return Next absolute segment index.
+     */
+    synchronized long nextAbsoluteSegmentIndex() throws 
IgniteInterruptedCheckedException {
+        curAbsWalIdx++;
+
+        notifyAll();
+
+        try {
+            while (curAbsWalIdx - 
segmentArchivedStorage.lastArchivedAbsoluteIndex() > walSegmentsCnt && 
!forceInterrupted)
+                wait();
+        }
+        catch (InterruptedException e) {
+            throw new IgniteInterruptedCheckedException(e);
+        }
+
+        if (forceInterrupted)
+            throw new IgniteInterruptedCheckedException("Interrupt waiting of 
change archived idx");
+
+        return curAbsWalIdx;
+    }
+
+    /**
+     * Update current WAL index.
+     *
+     * @param curAbsWalIdx New current WAL index.
+     */
+    synchronized void curAbsWalIdx(long curAbsWalIdx) {
+        this.curAbsWalIdx = curAbsWalIdx;
+
+        notifyAll();
+    }
+
+    /**
+     * @return Current WAL index.
+     */
+    long curAbsWalIdx() {
+        return this.curAbsWalIdx;
+    }
+
+    /**
+     * Interrupt waiting on this object.
+     */
+    synchronized void interrupt() {
+        interrupted = true;
+
+        notifyAll();
+    }
+
+    /**
+     * Interrupt waiting on this object.
+     */
+    synchronized void forceInterrupt() {
+        interrupted = true;
+        forceInterrupted = true;
+
+        notifyAll();
+    }
+
+    /**
+     * Callback for waking up awaiting when new segment is archived.
+     */
+    private synchronized void onSegmentArchived(long lastAbsArchivedIdx) {
+        notifyAll();
+    }
+
+    /**
+     * Check for interrupt flag was set.
+     */
+    private void checkInterrupted() throws IgniteInterruptedCheckedException {
+        if (interrupted)
+            throw new IgniteInterruptedCheckedException("Interrupt waiting of 
change current idx");
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentLockStorage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentLockStorage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentLockStorage.java
new file mode 100644
index 0000000..2e145e7
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentLockStorage.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.aware;
+
+import java.util.HashMap;
+import java.util.Map;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
+
+/**
+ * Lock on segment protects from archiving segment.
+ */
+public class SegmentLockStorage extends SegmentObservable {
+    /**
+     * Maps absolute segment index to locks counter. Lock on segment protects 
from archiving segment and may come from
+     * {@link FileWriteAheadLogManager.RecordsIterator} during WAL replay. Map 
itself is guarded by <code>this</code>.
+     */
+    private Map<Long, Integer> locked = new HashMap<>();
+
+    /**
+     * Check if WAL segment locked (protected from move to archive)
+     *
+     * @param absIdx Index for check reservation.
+     * @return {@code True} if index is locked.
+     */
+    public synchronized boolean locked(long absIdx) {
+        return locked.containsKey(absIdx);
+    }
+
+    /**
+     * @param absIdx Segment absolute index.
+     * @return <ul><li>{@code True} if can read, no lock is held, 
</li><li>{@code false} if work segment, need release
+     * segment later, use {@link #releaseWorkSegment} for unlock</li> </ul>
+     */
+    @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
+    synchronized boolean lockWorkSegment(long absIdx) {
+        Integer cur = locked.get(absIdx);
+
+        cur = cur == null ? 1 : cur + 1;
+
+        locked.put(absIdx, cur);
+
+        return false;
+    }
+
+    /**
+     * @param absIdx Segment absolute index.
+     */
+    @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
+    synchronized void releaseWorkSegment(long absIdx) {
+        Integer cur = locked.get(absIdx);
+
+        assert cur != null && cur >= 1 : "cur=" + cur + ", absIdx=" + absIdx;
+
+        if (cur == 1)
+            locked.remove(absIdx);
+        else
+            locked.put(absIdx, cur - 1);
+
+        notifyObservers(absIdx);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentObservable.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentObservable.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentObservable.java
new file mode 100644
index 0000000..ba5ad30
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentObservable.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.aware;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of observer-observable pattern. For handling specific 
changes of segment.
+ */
+public abstract class SegmentObservable {
+    /** Observers for handle changes of archived index. */
+    private final List<Consumer<Long>> observers = new ArrayList<>();
+
+    /**
+     * @param observer Observer for notification about segment's changes.
+     */
+    synchronized void addObserver(Consumer<Long> observer) {
+        observers.add(observer);
+    }
+
+    /**
+     * Notify observers about changes.
+     *
+     * @param segmentId Segment which was been changed.
+     */
+    synchronized void notifyObservers(long segmentId) {
+        observers.forEach(observer -> observer.accept(segmentId));
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java
new file mode 100644
index 0000000..50c2bbf
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/aware/SegmentReservationStorage.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.persistence.wal.aware;
+
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+/**
+ * Segment reservations storage: Protects WAL segments from deletion during 
WAL log cleanup.
+ */
+class SegmentReservationStorage {
+    /**
+     * Maps absolute segment index to reservation counter. If counter > 0 then 
we wouldn't delete all segments which has
+     * index >= reserved segment index. Guarded by {@code this}.
+     */
+    private NavigableMap<Long, Integer> reserved = new TreeMap<>();
+
+    /**
+     * @param absIdx Index for reservation.
+     */
+    synchronized void reserve(long absIdx) {
+        reserved.merge(absIdx, 1, (a, b) -> a + b);
+    }
+
+    /**
+     * Checks if segment is currently reserved (protected from deletion during 
WAL cleanup).
+     *
+     * @param absIdx Index for check reservation.
+     * @return {@code True} if index is reserved.
+     */
+    synchronized boolean reserved(long absIdx) {
+        return reserved.floorKey(absIdx) != null;
+    }
+
+    /**
+     * @param absIdx Reserved index.
+     */
+    synchronized void release(long absIdx) {
+        Integer cur = reserved.get(absIdx);
+
+        assert cur != null && cur >= 1 : "cur=" + cur + ", absIdx=" + absIdx;
+
+        if (cur == 1)
+            reserved.remove(absIdx);
+        else
+            reserved.put(absIdx, cur - 1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/FileInput.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/FileInput.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/FileInput.java
new file mode 100644
index 0000000..d19d17b
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/FileInput.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.io;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferBackedDataInput;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * File input, backed by byte buffer file input.
+ * This class allows to read data by chunks from file and then read primitives.
+ */
+public interface FileInput extends ByteBufferBackedDataInput {
+    /**
+     * File I/O.
+     */
+    FileIO io();
+
+    /**
+     * @param pos Position in bytes from file begin.
+     */
+    void seek(long pos) throws IOException;
+
+    /**
+     * @return Position in the stream.
+     */
+    long position();
+
+    /**
+     * @param skipCheck If CRC check should be skipped.
+     * @return autoclosable fileInput, after its closing crc32 will be 
calculated and compared with saved one
+     */
+    SimpleFileInput.Crc32CheckingFileInput startRead(boolean skipCheck);
+
+    /**
+     * Checking of CRC32.
+     */
+    public class Crc32CheckingFileInput implements ByteBufferBackedDataInput, 
AutoCloseable {
+        /** */
+        private final PureJavaCrc32 crc32 = new PureJavaCrc32();
+
+        /** Last calc position. */
+        private int lastCalcPosition;
+
+        /** Skip crc check. */
+        private boolean skipCheck;
+
+        /** */
+        private FileInput delegate;
+
+        /**
+         */
+        public Crc32CheckingFileInput(FileInput delegate, boolean skipCheck) {
+            this.delegate = delegate;
+            this.lastCalcPosition = delegate.buffer().position();
+            this.skipCheck = skipCheck;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void ensure(int requested) throws IOException {
+            int available = buffer().remaining();
+
+            if (available >= requested)
+                return;
+
+            updateCrc();
+
+            delegate.ensure(requested);
+
+            lastCalcPosition = 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() throws Exception {
+            updateCrc();
+
+            int val = crc32.getValue();
+
+            int writtenCrc =  this.readInt();
+
+            if ((val ^ writtenCrc) != 0 && !skipCheck) {
+                // If it last message we will skip it (EOF will be thrown).
+                ensure(5);
+
+                throw new IgniteDataIntegrityViolationException(
+                    "val: " + val + " writtenCrc: " + writtenCrc
+                );
+            }
+        }
+
+        /**
+         *
+         */
+        private void updateCrc() {
+            if (skipCheck)
+                return;
+
+            int oldPos = buffer().position();
+
+            buffer().position(lastCalcPosition);
+
+            crc32.update(delegate.buffer(), oldPos - lastCalcPosition);
+
+            lastCalcPosition = oldPos;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int skipBytes(int n) throws IOException {
+            ensure(n);
+
+            int skipped = Math.min(buffer().remaining(), n);
+
+            buffer().position(buffer().position() + skipped);
+
+            return skipped;
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override public void readFully(@NotNull byte[] b) throws IOException {
+            ensure(b.length);
+
+            buffer().get(b);
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override public void readFully(@NotNull byte[] b, int off, int len) 
throws IOException {
+            ensure(len);
+
+            buffer().get(b, off, len);
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override public boolean readBoolean() throws IOException {
+            return readByte() == 1;
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override public byte readByte() throws IOException {
+            ensure(1);
+
+            return buffer().get();
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override public int readUnsignedByte() throws IOException {
+            return readByte() & 0xFF;
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override public short readShort() throws IOException {
+            ensure(2);
+
+            return buffer().getShort();
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override public int readUnsignedShort() throws IOException {
+            return readShort() & 0xFFFF;
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override public char readChar() throws IOException {
+            ensure(2);
+
+            return buffer().getChar();
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override public int readInt() throws IOException {
+            ensure(4);
+
+            return buffer().getInt();
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override public long readLong() throws IOException {
+            ensure(8);
+
+            return buffer().getLong();
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override public float readFloat() throws IOException {
+            ensure(4);
+
+            return buffer().getFloat();
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override public double readDouble() throws IOException {
+            ensure(8);
+
+            return buffer().getDouble();
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override public String readLine() throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        /**
+         * {@inheritDoc}
+         */
+        @Override public String readUTF() throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        /** {@inheritDoc} */
+        @Override public ByteBuffer buffer() {
+            return delegate.buffer();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/LockedReadFileInput.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/LockedReadFileInput.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/LockedReadFileInput.java
new file mode 100644
index 0000000..6e5a7a4
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/LockedReadFileInput.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.io;
+
+import java.io.IOException;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferExpander;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentAware;
+
+/**
+ * File input, backed by byte buffer file input. This class allows to read 
data by chunks from file and then read
+ * primitives.
+ *
+ * This implementation locks segment only for reading to buffer and also can 
switch reading segment from work directory
+ * to archive directory if needed.
+ */
+final class LockedReadFileInput extends SimpleFileInput {
+    /** Segment for read. */
+    private final long segmentId;
+    /** Holder of actual information of latest manipulation on WAL segments. */
+    private final SegmentAware segmentAware;
+    /** Factory of file I/O for segment. */
+    private final SegmentIoFactory fileIOFactory;
+    /** Last read was from archive or not. */
+    private boolean isLastReadFromArchive;
+
+    /**
+     * @param buf Buffer for reading blocks of data into.
+     * @param initFileIo Initial File I/O for reading.
+     * @param segmentAware Holder of actual information of latest manipulation 
on WAL segments.
+     * @param segmentIOFactory Factory of file I/O for segment.
+     * @throws IOException if initFileIo would be fail during reading.
+     */
+    LockedReadFileInput(
+        ByteBufferExpander buf,
+        SegmentIO initFileIo,
+        SegmentAware segmentAware,
+        SegmentIoFactory segmentIOFactory
+    ) throws IOException {
+        super(initFileIo, buf);
+        this.segmentAware = segmentAware;
+        this.fileIOFactory = segmentIOFactory;
+        this.segmentId = initFileIo.getSegmentId();
+        isLastReadFromArchive = segmentAware.lastArchivedAbsoluteIndex() >= 
initFileIo.getSegmentId();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void ensure(int requested) throws IOException {
+        int available = buffer().remaining();
+
+        if (available >= requested)
+            return;
+
+        boolean readArchive = 
segmentAware.checkCanReadArchiveOrReserveWorkSegment(segmentId);
+        try {
+            if (readArchive && !isLastReadFromArchive) {
+                isLastReadFromArchive = true;
+
+                refreshIO();
+            }
+
+            super.ensure(requested);
+        }
+        finally {
+            if (!readArchive)
+                segmentAware.releaseWorkSegment(segmentId);
+        }
+    }
+
+    /**
+     * Refresh current file io.
+     *
+     * @throws IOException if old fileIO is fail during reading or new file is 
fail during creation.
+     */
+    private void refreshIO() throws IOException {
+        FileIO io = fileIOFactory.build(segmentId);
+
+        io.position(io().position());
+
+        io().close();
+
+        this.io = io;
+    }
+
+    /**
+     * Resolving fileIo for segment.
+     */
+    interface SegmentIoFactory {
+        /**
+         * @param segmentId Segment for IO action.
+         * @return {@link FileIO}.
+         * @throws IOException if creation would be fail.
+         */
+        FileIO build(long segmentId) throws IOException;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/LockedSegmentFileInputFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/LockedSegmentFileInputFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/LockedSegmentFileInputFactory.java
new file mode 100644
index 0000000..f3cdda7
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/LockedSegmentFileInputFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.io;
+
+import java.io.IOException;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferExpander;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentAware;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.SegmentRouter;
+
+/**
+ * Implementation of factory to provide I/O interfaces for read primitives 
with files.
+ *
+ * Creating {@link FileInput} with ability locked segment during reading.
+ */
+public class LockedSegmentFileInputFactory implements SegmentFileInputFactory {
+    /** Holder of actual information of latest manipulation on WAL segments. */
+    private final SegmentAware segmentAware;
+    /** Manager of segment location. */
+    private final SegmentRouter segmentRouter;
+    /** {@link FileIO} factory definition.*/
+    private final FileIOFactory fileIOFactory;
+
+    /**
+     * @param segmentAware Holder of actual information of latest manipulation 
on WAL segments.
+     * @param segmentRouter Manager of segment location.
+     * @param fileIOFactory {@link FileIO} factory definition.
+     */
+    public LockedSegmentFileInputFactory(
+        SegmentAware segmentAware,
+        SegmentRouter segmentRouter,
+        FileIOFactory fileIOFactory) {
+        this.segmentAware = segmentAware;
+        this.segmentRouter = segmentRouter;
+        this.fileIOFactory = fileIOFactory;
+    }
+
+    /** {@inheritDoc} */
+    @Override public FileInput createFileInput(SegmentIO segmentIO, 
ByteBufferExpander buf) throws IOException {
+        return new LockedReadFileInput(
+            buf,
+            segmentIO,
+            segmentAware,
+            id -> {
+                FileDescriptor segment = segmentRouter.findSegment(id);
+
+                return segment.toIO(fileIOFactory);
+            }
+        );
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/SegmentFileInputFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/SegmentFileInputFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/SegmentFileInputFactory.java
new file mode 100644
index 0000000..b688f90
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/SegmentFileInputFactory.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.io;
+
+import java.io.IOException;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.ByteBufferExpander;
+
+/**
+ * Factory to provide I/O interfaces for read primitives with files.
+ */
+public interface SegmentFileInputFactory {
+    /**
+     * @param segmentIO FileIO of segment for reading.
+     * @param buf ByteBuffer wrapper for dynamically expand buffer size.
+     * @return Instance of {@link FileInput}.
+     * @throws IOException If have some trouble with I/O.
+     */
+    FileInput createFileInput(SegmentIO segmentIO, ByteBufferExpander buf) 
throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/SegmentIO.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/SegmentIO.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/SegmentIO.java
new file mode 100644
index 0000000..d0a6445
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/io/SegmentIO.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.wal.io;
+
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
+
+/**
+ * Implementation of {@link FileIO} specified for WAL segment file.
+ */
+public class SegmentIO extends FileIODecorator {
+    /** Segment id. */
+    private final long segmentId;
+
+    /**
+     * @param id Segment id.
+     * @param delegate File I/O delegate
+     */
+    public SegmentIO(long id, FileIO delegate) {
+        super(delegate);
+        segmentId = id;
+    }
+
+    /**
+     * @return Segment id.
+     */
+    public long getSegmentId() {
+        return segmentId;
+    }
+}

Reply via email to