IGNITE-8559 Fix WAL rollOver can be blocked by WAL iterator reservation - Fixes 
#4449.

Signed-off-by: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2f72fe75
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2f72fe75
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2f72fe75

Branch: refs/heads/ignite-gg-14206
Commit: 2f72fe758d4256c4eb4610e5922ad3d174b43dc5
Parents: 1573f45
Author: Anton Kalashnikov <kaa....@yandex.ru>
Authored: Tue Sep 25 13:50:35 2018 +0300
Committer: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com>
Committed: Tue Sep 25 13:50:35 2018 +0300

----------------------------------------------------------------------
 .../cache/persistence/file/FileIOFactory.java   |   1 -
 .../wal/AbstractWalRecordsIterator.java         |  50 +-
 .../cache/persistence/wal/FileDescriptor.java   |  17 +-
 .../cache/persistence/wal/FileInput.java        | 481 ---------------
 .../wal/FileWriteAheadLogManager.java           | 544 ++++++-----------
 .../wal/FsyncModeFileWriteAheadLogManager.java  | 108 ++--
 .../persistence/wal/SegmentArchivedMonitor.java |  64 --
 .../wal/SegmentReservationStorage.java          |  61 --
 .../cache/persistence/wal/SegmentRouter.java    |  90 +++
 .../SingleSegmentLogicalRecordsIterator.java    |  11 +-
 .../wal/aware/SegmentArchivedStorage.java       | 137 +++++
 .../persistence/wal/aware/SegmentAware.java     | 234 ++++++++
 .../wal/aware/SegmentCompressStorage.java       | 116 ++++
 .../wal/aware/SegmentCurrentStateStorage.java   | 171 ++++++
 .../wal/aware/SegmentLockStorage.java           |  76 +++
 .../wal/aware/SegmentObservable.java            |  46 ++
 .../wal/aware/SegmentReservationStorage.java    |  62 ++
 .../cache/persistence/wal/io/FileInput.java     | 258 ++++++++
 .../persistence/wal/io/LockedReadFileInput.java | 111 ++++
 .../wal/io/LockedSegmentFileInputFactory.java   |  68 +++
 .../wal/io/SegmentFileInputFactory.java         |  34 ++
 .../cache/persistence/wal/io/SegmentIO.java     |  45 ++
 .../persistence/wal/io/SimpleFileInput.java     | 272 +++++++++
 .../wal/io/SimpleSegmentFileInputFactory.java   |  33 +
 .../wal/reader/IgniteWalIteratorFactory.java    |  13 +-
 .../reader/StandaloneWalRecordsIterator.java    |  23 +-
 .../wal/serializer/RecordSerializer.java        |   2 +-
 .../wal/serializer/RecordV1Serializer.java      |  16 +-
 .../wal/serializer/RecordV2Serializer.java      |   2 +-
 .../wal/IgniteWalHistoryReservationsTest.java   |  87 ++-
 .../wal/IgniteWalIteratorSwitchSegmentTest.java | 124 +++-
 .../db/wal/crc/IgniteDataIntegrityTests.java    |   7 +-
 .../persistence/wal/aware/SegmentAwareTest.java | 601 +++++++++++++++++++
 .../StandaloneWalRecordsIteratorTest.java       |   2 +-
 .../ignite/testframework/GridTestUtils.java     | 108 ++--
 .../ignite/testsuites/IgnitePdsTestSuite.java   |   4 +
 .../db/wal/IgniteWalRecoveryTest.java           | 139 ++++-
 37 files changed, 3041 insertions(+), 1177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java
index c3a75f5..2735185 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java
@@ -44,5 +44,4 @@ public interface FileIOFactory extends Serializable {
      * @throws IOException If I/O interface creation was failed.
      */
     public FileIO create(File file, OpenOption... modes) throws IOException;
-
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
index aa8eb31..3cbe577 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
@@ -30,7 +30,9 @@ import 
org.apache.ignite.internal.pagemem.wal.record.WALRecord;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
-import 
org.apache.ignite.internal.processors.cache.persistence.file.UnzipFileIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentFileInputFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.SegmentHeader;
@@ -87,24 +89,29 @@ public abstract class AbstractWalRecordsIterator
     /** Utility buffer for reading records */
     private final ByteBufferExpander buf;
 
+    /** Factory to provide I/O interfaces for read primitives with files. */
+    private final SegmentFileInputFactory segmentFileInputFactory;
+
     /**
      * @param log Logger.
      * @param sharedCtx Shared context.
      * @param serializerFactory Serializer of current version to read headers.
      * @param ioFactory ioFactory for file IO access.
      * @param initialReadBufferSize buffer for reading records size.
+     * @param segmentFileInputFactory Factory to provide I/O interfaces for 
read primitives with files.
      */
     protected AbstractWalRecordsIterator(
         @NotNull final IgniteLogger log,
         @NotNull final GridCacheSharedContext sharedCtx,
         @NotNull final RecordSerializerFactory serializerFactory,
         @NotNull final FileIOFactory ioFactory,
-        final int initialReadBufferSize
-    ) {
+        final int initialReadBufferSize,
+        SegmentFileInputFactory segmentFileInputFactory) {
         this.log = log;
         this.sharedCtx = sharedCtx;
         this.serializerFactory = serializerFactory;
         this.ioFactory = ioFactory;
+        this.segmentFileInputFactory = segmentFileInputFactory;
 
         buf = new ByteBufferExpander(initialReadBufferSize, 
ByteOrder.nativeOrder());
     }
@@ -134,11 +141,8 @@ public abstract class AbstractWalRecordsIterator
     }
 
     /**
-     * Switches records iterator to the next record.
-     * <ul>
-     * <li>{@link #curRec} will be updated.</li>
-     * <li> If end of segment reached, switch to new segment is called. {@link 
#currWalSegment} will be updated.</li>
-     * </ul>
+     * Switches records iterator to the next record. <ul> <li>{@link #curRec} 
will be updated.</li> <li> If end of
+     * segment reached, switch to new segment is called. {@link 
#currWalSegment} will be updated.</li> </ul>
      *
      * {@code advance()} runs a step ahead {@link #next()}
      *
@@ -303,16 +307,16 @@ public abstract class AbstractWalRecordsIterator
     protected AbstractReadFileHandle initReadHandle(
         @NotNull final AbstractFileDescriptor desc,
         @Nullable final FileWALPointer start,
-        @NotNull final FileIO fileIO,
+        @NotNull final SegmentIO fileIO,
         @NotNull final SegmentHeader segmentHeader
     ) throws IgniteCheckedException {
         try {
-            final boolean isCompacted = segmentHeader.isCompacted();
+            boolean isCompacted = segmentHeader.isCompacted();
 
             if (isCompacted)
                 serializerFactory.skipPositionCheck(true);
 
-            FileInput in = new FileInput(fileIO, buf);
+            FileInput in = segmentFileInputFactory.createFileInput(fileIO, 
buf);
 
             if (start != null && desc.idx() == start.index()) {
                 if (isCompacted) {
@@ -329,7 +333,7 @@ public abstract class AbstractWalRecordsIterator
 
             int serVer = segmentHeader.getSerializerVersion();
 
-            return createReadFileHandle(fileIO, desc.idx(), 
serializerFactory.createSerializer(serVer), in);
+            return createReadFileHandle(fileIO, 
serializerFactory.createSerializer(serVer), in);
         }
         catch (SegmentEofException | EOFException ignore) {
             try {
@@ -368,15 +372,15 @@ public abstract class AbstractWalRecordsIterator
         @NotNull final AbstractFileDescriptor desc,
         @Nullable final FileWALPointer start
     ) throws IgniteCheckedException, FileNotFoundException {
-        FileIO fileIO = null;
+        SegmentIO fileIO = null;
 
         try {
-            fileIO = desc.isCompressed() ? new UnzipFileIO(desc.file()) : 
ioFactory.create(desc.file());
+            fileIO = desc.toIO(ioFactory);
 
             SegmentHeader segmentHeader;
 
             try {
-                segmentHeader = readSegmentHeader(fileIO, curWalSegmIdx);
+                segmentHeader = readSegmentHeader(fileIO, 
segmentFileInputFactory);
             }
             catch (SegmentEofException | EOFException ignore) {
                 try {
@@ -411,8 +415,7 @@ public abstract class AbstractWalRecordsIterator
 
     /** */
     protected abstract AbstractReadFileHandle createReadFileHandle(
-        FileIO fileIO,
-        long idx,
+        SegmentIO fileIO,
         RecordSerializer ser,
         FileInput in
     );
@@ -460,7 +463,9 @@ public abstract class AbstractWalRecordsIterator
         /** */
         RecordSerializer ser();
 
-        /** */
+        /**
+         *
+         */
         boolean workDir();
     }
 
@@ -474,5 +479,14 @@ public abstract class AbstractWalRecordsIterator
 
         /** */
         long idx();
+
+        /**
+         * Make fileIo by this description.
+         *
+         * @param fileIOFactory Factory for fileIo creation.
+         * @return One of implementation of {@link FileIO}.
+         * @throws IOException if creation of fileIo was not success.
+         */
+        SegmentIO toIO(FileIOFactory fileIOFactory) throws IOException;
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileDescriptor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileDescriptor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileDescriptor.java
index a73248a..f265376 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileDescriptor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileDescriptor.java
@@ -17,15 +17,17 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.wal;
 
+import java.io.File;
+import java.io.IOException;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.UnzipFileIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO;
 import org.apache.ignite.internal.util.typedef.internal.SB;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
-import java.io.File;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
 /**
  * WAL file descriptor.
  */
@@ -134,4 +136,11 @@ public class FileDescriptor implements 
Comparable<FileDescriptor>, AbstractWalRe
     @Override public long idx() {
         return idx;
     }
+
+    /** {@inheritDoc} */
+    @Override public SegmentIO toIO(FileIOFactory fileIOFactory) throws 
IOException {
+        FileIO fileIO = isCompressed() ? new UnzipFileIO(file()) : 
fileIOFactory.create(file());
+
+        return new SegmentIO(idx, fileIO);
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
deleted file mode 100644
index 303a023..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java
+++ /dev/null
@@ -1,481 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.cache.persistence.wal;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
-import 
org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
-import 
org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32;
-import org.jetbrains.annotations.NotNull;
-
-/**
- * File input, backed by byte buffer file input.
- * This class allows to read data by chunks from file and then read primitives
- */
-public final class FileInput implements ByteBufferBackedDataInput {
-    /**
-     * Buffer for reading blocks of data into.
-     * <b>Note:</b> biggest block requested from this input can't be longer 
than buffer capacity
-     */
-    private ByteBuffer buf;
-
-    /** I/O interface for read/write operations with file */
-    private FileIO io;
-
-    /** */
-    private long pos;
-
-    /** */
-    private ByteBufferExpander expBuf;
-
-    /**
-     * @param io FileIO to read from.
-     * @param buf Buffer for reading blocks of data into.
-     */
-    public FileInput(FileIO io, ByteBufferExpander buf) throws IOException {
-        assert io != null;
-
-        this.io = io;
-        this.buf = buf.buffer();
-
-        expBuf = buf;
-
-        pos = io.position();
-
-        clearBuffer();
-    }
-
-    /**
-     * File I/O.
-     */
-    public FileIO io() {
-        return io;
-    }
-
-    /**
-     * Clear buffer.
-     */
-    private void clearBuffer() {
-        buf.clear();
-        buf.limit(0);
-
-        assert buf.remaining() == 0; // Buffer is empty.
-    }
-
-    /**
-     * @param pos Position in bytes from file begin.
-     */
-    public void seek(long pos) throws IOException {
-        if (pos > io.size())
-            throw new EOFException();
-
-        io.position(pos);
-
-        this.pos = pos;
-
-        clearBuffer();
-    }
-
-    /**
-     * @return Underlying buffer.
-     */
-    @Override public ByteBuffer buffer() {
-        return buf;
-    }
-
-
-    /** {@inheritDoc} */
-    @Override public void ensure(int requested) throws IOException {
-        int available = buf.remaining();
-
-        if (available >= requested)
-            return;
-
-        if (buf.capacity() < requested) {
-            if (expBuf == null)
-                throw new IOException("Requested size is greater than buffer: 
" + requested);
-
-            buf = expBuf.expand(requested);
-
-            assert available == buf.remaining();
-        }
-
-        buf.compact();
-
-        do {
-            int read = io.read(buf);
-
-            if (read == -1)
-                throw new EOFException("EOF at position [" + io.position() + 
"] expected to read [" + requested + "] bytes");
-
-            available += read;
-
-            pos += read;
-        }
-        while (available < requested);
-
-        buf.flip();
-    }
-
-    /**
-     * @return Position in the stream.
-     */
-    public long position() {
-        return pos - buf.remaining();
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override public void readFully(@NotNull byte[] b) throws IOException {
-        ensure(b.length);
-
-        buf.get(b);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override public void readFully(@NotNull byte[] b, int off, int len) 
throws IOException {
-        ensure(len);
-
-        buf.get(b, off, len);
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override public int skipBytes(int n) throws IOException {
-        if (buf.remaining() >= n)
-            buf.position(buf.position() + n);
-        else
-            seek(pos + n);
-
-        return n;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override public boolean readBoolean() throws IOException {
-        return readByte() == 1;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override public byte readByte() throws IOException {
-        ensure(1);
-
-        return buf.get();
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override public int readUnsignedByte() throws IOException {
-        return readByte() & 0xFF;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override public short readShort() throws IOException {
-        ensure(2);
-
-        return buf.getShort();
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override public int readUnsignedShort() throws IOException {
-        return readShort() & 0xFFFF;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override public char readChar() throws IOException {
-        ensure(2);
-
-        return buf.getChar();
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override public int readInt() throws IOException {
-        ensure(4);
-
-        return buf.getInt();
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override public long readLong() throws IOException {
-        ensure(8);
-
-        return buf.getLong();
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override public float readFloat() throws IOException {
-        ensure(4);
-
-        return buf.getFloat();
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override public double readDouble() throws IOException {
-        ensure(8);
-
-        return buf.getDouble();
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override public String readLine() throws IOException {
-        throw new UnsupportedOperationException();
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override public String readUTF() throws IOException {
-        throw new UnsupportedOperationException();
-    }
-
-    /**
-     * @param skipCheck If CRC check should be skipped.
-     * @return autoclosable fileInput, after its closing crc32 will be 
calculated and compared with saved one
-     */
-    public Crc32CheckingFileInput startRead(boolean skipCheck) {
-        return new Crc32CheckingFileInput(buf.position(), skipCheck);
-    }
-
-    /**
-     * Checking of CRC32.
-     */
-    public class Crc32CheckingFileInput implements ByteBufferBackedDataInput, 
AutoCloseable {
-        /** */
-        private final PureJavaCrc32 crc32 = new PureJavaCrc32();
-
-        /** Last calc position. */
-        private int lastCalcPosition;
-
-        /** Skip crc check. */
-        private boolean skipCheck;
-
-        /**
-         * @param position Position.
-         */
-        public Crc32CheckingFileInput(int position, boolean skipCheck) {
-            this.lastCalcPosition = position;
-            this.skipCheck = skipCheck;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void ensure(int requested) throws IOException {
-            int available = buf.remaining();
-
-            if (available >= requested)
-                return;
-
-            updateCrc();
-
-            FileInput.this.ensure(requested);
-
-            lastCalcPosition = 0;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void close() throws Exception {
-            updateCrc();
-
-            int val = crc32.getValue();
-
-            int writtenCrc =  this.readInt();
-
-            if ((val ^ writtenCrc) != 0 && !skipCheck) {
-                // If it last message we will skip it (EOF will be thrown).
-                ensure(5);
-
-                throw new IgniteDataIntegrityViolationException(
-                    "val: " + val + " writtenCrc: " + writtenCrc
-                );
-            }
-        }
-
-        /**
-         *
-         */
-        private void updateCrc() {
-            if (skipCheck)
-                return;
-
-            int oldPos = buf.position();
-
-            buf.position(lastCalcPosition);
-
-            crc32.update(buf, oldPos - lastCalcPosition);
-
-            lastCalcPosition = oldPos;
-        }
-
-        /** {@inheritDoc} */
-        @Override public int skipBytes(int n) throws IOException {
-            ensure(n);
-
-            int skipped = Math.min(buf.remaining(), n);
-
-            buf.position(buf.position() + skipped);
-
-            return skipped;
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override public void readFully(@NotNull byte[] b) throws IOException {
-            ensure(b.length);
-
-            buf.get(b);
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override public void readFully(@NotNull byte[] b, int off, int len) 
throws IOException {
-            ensure(len);
-
-            buf.get(b, off, len);
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override public boolean readBoolean() throws IOException {
-            return readByte() == 1;
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override public byte readByte() throws IOException {
-            ensure(1);
-
-            return buf.get();
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override public int readUnsignedByte() throws IOException {
-            return readByte() & 0xFF;
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override public short readShort() throws IOException {
-            ensure(2);
-
-            return buf.getShort();
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override public int readUnsignedShort() throws IOException {
-            return readShort() & 0xFFFF;
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override public char readChar() throws IOException {
-            ensure(2);
-
-            return buf.getChar();
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override public int readInt() throws IOException {
-            ensure(4);
-
-            return buf.getInt();
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override public long readLong() throws IOException {
-            ensure(8);
-
-            return buf.getLong();
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override public float readFloat() throws IOException {
-            ensure(4);
-
-            return buf.getFloat();
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override public double readDouble() throws IOException {
-            ensure(8);
-
-            return buf.getDouble();
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override public String readLine() throws IOException {
-            throw new UnsupportedOperationException();
-        }
-
-        /**
-         * {@inheritDoc}
-         */
-        @Override public String readUTF() throws IOException {
-            throw new UnsupportedOperationException();
-        }
-
-        /** {@inheritDoc} */
-        @Override public ByteBuffer buffer() {
-            return FileInput.this.buffer();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index f53c02f..8765309 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -97,7 +97,13 @@ import 
org.apache.ignite.internal.processors.cache.persistence.file.RandomAccess
 import 
org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentFileInputFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.io.LockedSegmentFileInputFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.io.SimpleSegmentFileInputFactory;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentAware;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl;
@@ -251,13 +257,6 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
     private static final double THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE =
         
IgniteSystemProperties.getDouble(IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE, 
0.5);
 
-    /** Interrupted flag. */
-    private final ThreadLocal<Boolean> interrupted = new 
ThreadLocal<Boolean>() {
-        @Override protected Boolean initialValue() {
-            return false;
-        }
-    };
-
     /** */
     private final boolean alwaysWriteFullPages;
 
@@ -310,17 +309,14 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
     private final int serializerVer =
         IgniteSystemProperties.getInteger(IGNITE_WAL_SERIALIZER_VERSION, 
LATEST_SERIALIZER_VERSION);
 
-    /** Latest segment cleared by {@link #truncate(WALPointer, WALPointer)}. */
-    private volatile long lastTruncatedArchiveIdx = -1L;
-
     /** Factory to provide I/O interfaces for read/write operations with files 
*/
     private volatile FileIOFactory ioFactory;
 
-    /** Next WAL segment archived monitor. Manages last archived index, 
emulates archivation in no-archiver mode. */
-    private final SegmentArchivedMonitor archivedMonitor = new 
SegmentArchivedMonitor();
+    /** Factory to provide I/O interfaces for read primitives with files */
+    private final SegmentFileInputFactory segmentFileInputFactory;
 
-    /** Segment reservations storage: Protects WAL segments from deletion 
during WAL log cleanup. */
-    private final SegmentReservationStorage reservationStorage = new 
SegmentReservationStorage();
+    /** Holder of actual information of latest manipulation on WAL segments. */
+    private final SegmentAware segmentAware;
 
     /** Updater for {@link #currHnd}, used for verify there are no concurrent 
update for current log segment handle */
     private static final AtomicReferenceFieldUpdater<FileWriteAheadLogManager, 
FileWriteHandle> CURR_HND_UPD =
@@ -384,6 +380,14 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
     private WalSegmentSyncer walSegmentSyncWorker;
 
     /**
+     * Manage of segment location.
+     */
+    private SegmentRouter segmentRouter;
+
+    /** Segment factory with ability locked segment during reading. */
+    private SegmentFileInputFactory lockedSegmentFileInputFactory;
+
+    /**
      * @param ctx Kernal context.
      */
     public FileWriteAheadLogManager(@NotNull final GridKernalContext ctx) {
@@ -401,6 +405,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         fsyncDelay = dsCfg.getWalFsyncDelayNanos();
         alwaysWriteFullPages = dsCfg.isAlwaysWriteFullPages();
         ioFactory = new RandomAccessFileIOFactory();
+        segmentFileInputFactory = new SimpleSegmentFileInputFactory();
         walAutoArchiveAfterInactivity = 
dsCfg.getWalAutoArchiveAfterInactivity();
 
         maxSegCountWithoutCheckpoint =
@@ -410,6 +415,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
         evt = ctx.event();
         failureProcessor = ctx.failure();
+        segmentAware = new SegmentAware(dsCfg.getWalSegments());
     }
 
     /**
@@ -467,7 +473,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
             IgniteBiTuple<Long, Long> tup = scanMinMaxArchiveIndices();
 
-            lastTruncatedArchiveIdx = tup == null ? -1 : tup.get1() - 1;
+            segmentAware.lastTruncatedArchiveIdx(tup == null ? -1 : tup.get1() 
- 1);
 
             long lastAbsArchivedIdx = tup == null ? -1 : tup.get2();
 
@@ -477,7 +483,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                 archiver = null;
 
             if (lastAbsArchivedIdx > 0)
-                
archivedMonitor.setLastArchivedAbsoluteIndex(lastAbsArchivedIdx);
+                segmentAware.setLastArchivedAbsoluteIndex(lastAbsArchivedIdx);
 
             if (dsCfg.isWalCompactionEnabled()) {
                 compressor = new FileCompressor();
@@ -489,6 +495,8 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                 }
             }
 
+            segmentRouter = new SegmentRouter(walWorkDir, walArchiveDir, 
segmentAware, dsCfg);
+
             walDisableContext = cctx.walState().walDisableContext();
 
             if (mode != WALMode.NONE && mode != WALMode.FSYNC) {
@@ -501,6 +509,12 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
             else
                 U.quietAndWarn(log, "Started write-ahead log manager in NONE 
mode, persisted data may be lost in " +
                     "a case of unexpected node failure. Make sure to 
deactivate the cluster before shutdown.");
+
+            lockedSegmentFileInputFactory = new LockedSegmentFileInputFactory(
+                segmentAware,
+                segmentRouter,
+                ioFactory
+            );
         }
     }
 
@@ -525,7 +539,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
     public Collection<File> getAndReserveWalFiles(FileWALPointer low, 
FileWALPointer high) throws IgniteCheckedException {
         final long awaitIdx = high.index() - 1;
 
-        archivedMonitor.awaitSegmentArchived(awaitIdx);
+        segmentAware.awaitSegmentArchived(awaitIdx);
 
         if (!reserve(low))
             throw new IgniteCheckedException("WAL archive segment has been 
deleted [idx=" + low.index() + "]");
@@ -598,6 +612,8 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
             if (walWriter != null)
                 walWriter.shutdown();
 
+            segmentAware.interrupt();
+
             if (archiver != null)
                 archiver.shutdown();
 
@@ -792,14 +808,14 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
             if (rec.rollOver()) {
                 assert cctx.database().checkpointLockIsHeldByThread();
 
-                long idx = currWrHandle.idx;
+                long idx = currWrHandle.getSegmentId();
 
                 currWrHandle.buf.close();
 
                 currWrHandle = rollOver(currWrHandle);
 
                 if (log != null && log.isInfoEnabled())
-                    log.info("Rollover segment [" + idx + " to " + 
currWrHandle.idx + "], recordType=" + rec.type());
+                    log.info("Rollover segment [" + idx + " to " + 
currWrHandle.getSegmentId() + "], recordType=" + rec.type());
             }
 
             WALPointer ptr = currWrHandle.addRecord(rec);
@@ -863,8 +879,8 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
         return new RecordsIterator(
             cctx,
-            walWorkDir,
             walArchiveDir,
+            walWorkDir,
             (FileWALPointer)start,
             end,
             dsCfg,
@@ -872,8 +888,10 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
             ioFactory,
             archiver,
             decompressor,
-            log
-        );
+            log,
+            segmentAware,
+            segmentRouter,
+            lockedSegmentFileInputFactory);
     }
 
     /** {@inheritDoc} */
@@ -883,10 +901,10 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         if (mode == WALMode.NONE)
             return false;
 
-        reservationStorage.reserve(((FileWALPointer)start).index());
+        segmentAware.reserve(((FileWALPointer)start).index());
 
         if (!hasIndex(((FileWALPointer)start).index())) {
-            reservationStorage.release(((FileWALPointer)start).index());
+            segmentAware.release(((FileWALPointer)start).index());
 
             return false;
         }
@@ -901,7 +919,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         if (mode == WALMode.NONE)
             return;
 
-        reservationStorage.release(((FileWALPointer)start).index());
+        segmentAware.release(((FileWALPointer)start).index());
     }
 
     /**
@@ -924,7 +942,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
         FileWriteHandle cur = currHnd;
 
-        return cur != null && cur.idx >= absIdx;
+        return cur != null && cur.getSegmentId() >= absIdx;
     }
 
     /** {@inheritDoc} */
@@ -950,7 +968,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
             if (segmentReservedOrLocked(desc.idx))
                 return deleted;
 
-            long archivedAbsIdx = archivedMonitor.lastArchivedAbsoluteIndex();
+            long archivedAbsIdx = segmentAware.lastArchivedAbsoluteIndex();
 
             long lastArchived = archivedAbsIdx >= 0 ? archivedAbsIdx : 
lastArchivedIndex();
 
@@ -963,8 +981,8 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                     deleted++;
 
                 // Bump up the oldest archive segment index.
-                if (lastTruncatedArchiveIdx < desc.idx)
-                    lastTruncatedArchiveIdx = desc.idx;
+                if (segmentAware.lastTruncatedArchiveIdx() < desc.idx)
+                    segmentAware.lastTruncatedArchiveIdx(desc.idx);
             }
         }
 
@@ -981,9 +999,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
     private boolean segmentReservedOrLocked(long absIdx) {
         FileArchiver archiver0 = archiver;
 
-        return ((archiver0 != null) && archiver0.locked(absIdx))
-            || (reservationStorage.reserved(absIdx));
-
+        return ((archiver0 != null) && segmentAware.locked(absIdx)) || 
(segmentAware.reserved(absIdx));
     }
 
     /** {@inheritDoc} */
@@ -994,9 +1010,9 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
     /** {@inheritDoc} */
     @Override public int walArchiveSegments() {
-        long lastTruncated = lastTruncatedArchiveIdx;
+        long lastTruncated = segmentAware.lastTruncatedArchiveIdx();
 
-        long lastArchived = archivedMonitor.lastArchivedAbsoluteIndex();
+        long lastArchived = segmentAware.lastArchivedAbsoluteIndex();
 
         if (lastArchived == -1)
             return 0;
@@ -1008,12 +1024,12 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
     /** {@inheritDoc} */
     @Override public long lastArchivedSegment() {
-        return archivedMonitor.lastArchivedAbsoluteIndex();
+        return segmentAware.lastArchivedAbsoluteIndex();
     }
 
     /** {@inheritDoc} */
     @Override public long lastCompactedSegment() {
-        return compressor != null ? compressor.lastCompressedIdx : -1L;
+        return segmentAware.lastCompressedIdx();
     }
 
     /** {@inheritDoc} */
@@ -1170,7 +1186,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
             next.writeHeader();
 
-            if (next.idx - lashCheckpointFileIdx() >= 
maxSegCountWithoutCheckpoint)
+            if (next.getSegmentId() - lashCheckpointFileIdx() >= 
maxSegCountWithoutCheckpoint)
                 cctx.database().forceCheckpoint("too big size of WAL without 
checkpoint");
 
             boolean swapped = CURR_HND_UPD.compareAndSet(this, hnd, next);
@@ -1216,7 +1232,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         int len = lastReadPtr == null ? 0 : lastReadPtr.length();
 
         try {
-            FileIO fileIO = ioFactory.create(curFile);
+            SegmentIO fileIO = new SegmentIO(absIdx, 
ioFactory.create(curFile));
 
             IgniteInClosure<FileIO> lsnr = createWalFileListener;
 
@@ -1229,7 +1245,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                 // If we have existing segment, try to read version from it.
                 if (lastReadPtr != null) {
                     try {
-                        serVer = readSegmentHeader(fileIO, 
absIdx).getSerializerVersion();
+                        serVer = readSegmentHeader(fileIO, 
segmentFileInputFactory).getSerializerVersion();
                     }
                     catch (SegmentEofException | EOFException ignore) {
                         serVer = serializerVer;
@@ -1257,16 +1273,15 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
                 FileWriteHandle hnd = new FileWriteHandle(
                     fileIO,
-                    absIdx,
                     off + len,
                     true,
                     ser,
                     rbuf);
 
                 if (archiver0 != null)
-                    archiver0.currentWalIndex(absIdx);
+                    segmentAware.curAbsWalIdx(absIdx);
                 else
-                    archivedMonitor.setLastArchivedAbsoluteIndex(absIdx - 1);
+                    segmentAware.setLastArchivedAbsoluteIndex(absIdx - 1);
 
                 return hnd;
             }
@@ -1301,22 +1316,22 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         IgniteCheckedException error = null;
 
         try {
-            File nextFile = pollNextFile(cur.idx);
+            File nextFile = pollNextFile(cur.getSegmentId());
 
             if (log.isDebugEnabled())
                 log.debug("Switching to a new WAL segment: " + 
nextFile.getAbsolutePath());
 
             SegmentedRingByteBuffer rbuf = null;
 
-            FileIO fileIO = null;
+            SegmentIO fileIO = null;
 
             FileWriteHandle hnd;
 
-            boolean interrupted = this.interrupted.get();
+            boolean interrupted = false;
 
             while (true) {
                 try {
-                    fileIO = ioFactory.create(nextFile);
+                    fileIO = new SegmentIO(cur.getSegmentId() + 1, 
ioFactory.create(nextFile));
 
                     IgniteInClosure<FileIO> lsnr = createWalFileListener;
                     if (lsnr != null)
@@ -1332,7 +1347,6 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
                     hnd = new FileWriteHandle(
                         fileIO,
-                        cur.idx + 1,
                         0,
                         false,
                         serializer,
@@ -1365,9 +1379,6 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                         rbuf = null;
                     }
                 }
-                finally {
-                    this.interrupted.set(false);
-                }
             }
 
             return hnd;
@@ -1521,17 +1532,17 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
      * @return File ready for use as new WAL segment.
      * @throws StorageException If exception occurred in the archiver thread.
      */
-    private File pollNextFile(long curIdx) throws StorageException {
+    private File pollNextFile(long curIdx) throws StorageException, 
IgniteInterruptedCheckedException {
         FileArchiver archiver0 = archiver;
 
         if (archiver0 == null) {
-            archivedMonitor.setLastArchivedAbsoluteIndex(curIdx);
+            segmentAware.setLastArchivedAbsoluteIndex(curIdx);
 
             return new File(walWorkDir, FileDescriptor.fileName(curIdx + 1));
         }
 
         // Signal to archiver that we are done with the segment and it can be 
archived.
-        long absNextIdx = archiver0.nextAbsoluteSegmentIndex(curIdx);
+        long absNextIdx = archiver0.nextAbsoluteSegmentIndex();
 
         long segmentIdx = absNextIdx % dsCfg.getWalSegments();
 
@@ -1624,23 +1635,14 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
      *
      * Monitor of current object is used for notify on: <ul> <li>exception 
occurred ({@link
      * FileArchiver#cleanErr}!=null)</li> <li>stopping thread ({@link 
FileArchiver#stopped}==true)</li> <li>current file
-     * index changed ({@link FileArchiver#curAbsWalIdx})</li> <li>last 
archived file index was changed ({@link
-     * FileArchiver#lastAbsArchivedIdx})</li> <li>some WAL index was removed 
from {@link FileArchiver#locked} map</li>
+     * index changed </li> <li>last archived file index was changed ({@link
+     * </li> <li>some WAL index was removed from map</li>
      * </ul>
      */
     private class FileArchiver extends GridWorker {
         /** Exception which occurred during initial creation of files or 
during archiving WAL segment */
         private StorageException cleanErr;
 
-        /**
-         * Absolute current segment index WAL Manager writes to. Guarded by 
<code>this</code>. Incremented during
-         * rollover. Also may be directly set if WAL is resuming logging after 
start.
-         */
-        private long curAbsWalIdx = -1;
-
-        /** Last archived file index (absolute, 0-based). Guarded by 
<code>this</code>. */
-        private volatile long lastAbsArchivedIdx = -1;
-
         /** current thread stopping advice */
         private volatile boolean stopped;
 
@@ -1648,19 +1650,13 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         private int formatted;
 
         /**
-         * Maps absolute segment index to locks counter. Lock on segment 
protects from archiving segment and may come
-         * from {@link RecordsIterator} during WAL replay. Map itself is 
guarded by <code>this</code>.
-         */
-        private Map<Long, Integer> locked = new HashMap<>();
-
-        /**
          *
          */
         private FileArchiver(long lastAbsArchivedIdx, IgniteLogger log) {
             super(cctx.igniteInstanceName(), "wal-file-archiver%" + 
cctx.igniteInstanceName(), log,
                 cctx.kernalContext().workersRegistry());
 
-            this.lastAbsArchivedIdx = lastAbsArchivedIdx;
+            segmentAware.setLastArchivedAbsoluteIndex(lastAbsArchivedIdx);
         }
 
         /**
@@ -1676,27 +1672,6 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
             U.join(runner());
         }
 
-        /**
-         * @param curAbsWalIdx Current absolute WAL segment index.
-         */
-        private void currentWalIndex(long curAbsWalIdx) {
-            synchronized (this) {
-                this.curAbsWalIdx = curAbsWalIdx;
-
-                notifyAll();
-            }
-        }
-
-        /**
-         * Check if WAL segment locked (protected from move to archive)
-         *
-         * @param absIdx Index for check reservation.
-         * @return {@code True} if index is locked.
-         */
-        private synchronized boolean locked(long absIdx) {
-            return locked.containsKey(absIdx);
-        }
-
         /** {@inheritDoc} */
         @Override protected void body() {
             blockingSectionBegin();
@@ -1709,6 +1684,8 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                     // Stop the thread and report to starter.
                     cleanErr = e;
 
+                    segmentAware.forceInterrupt();
+
                     notifyAll();
                 }
 
@@ -1723,44 +1700,24 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
             Throwable err = null;
 
             try {
-                synchronized (this) {
-                    while (curAbsWalIdx == -1 && !stopped) {
-                        blockingSectionBegin();
-
-                        try {
-                            wait();
-                        }
-                        finally {
-                            blockingSectionEnd();
-                        }
-                    }
-
-                    // If the archive directory is empty, we can be sure that 
there were no WAL segments archived.
-                    // This is ensured by the check in truncate() which will 
leave at least one file there
-                    // once it was archived.
+                blockingSectionBegin();
+                try {
+                    segmentAware.awaitSegment(0);//wait for init at least one 
work segments.
+                }
+                finally {
+                    blockingSectionEnd();
                 }
-
                 while (!Thread.currentThread().isInterrupted() && !stopped) {
                     long toArchive;
 
-                    synchronized (this) {
-                        assert lastAbsArchivedIdx <= curAbsWalIdx : 
"lastArchived=" + lastAbsArchivedIdx +
-                            ", current=" + curAbsWalIdx;
-
-                        while (lastAbsArchivedIdx >= curAbsWalIdx - 1 && 
!stopped) {
-                            blockingSectionBegin();
-
-                            try {
-                                wait();
-                            }
-                            finally {
-                                blockingSectionEnd();
-                            }
-                        }
+                    blockingSectionBegin();
 
-                        toArchive = lastAbsArchivedIdx + 1;
+                    try {
+                        toArchive = 
segmentAware.waitNextSegmentForArchivation();
+                    }
+                    finally {
+                        blockingSectionEnd();
                     }
-
                     if (stopped)
                         break;
 
@@ -1775,40 +1732,32 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                         blockingSectionEnd();
                     }
 
-                    synchronized (this) {
-                        while (locked.containsKey(toArchive) && !stopped) {
-                            blockingSectionBegin();
-
-                            try {
-                                wait();
-                            }
-                            finally {
-                                blockingSectionEnd();
-                            }
-                        }
-
-                        // Then increase counter to allow rollover on clean 
working file
-                        changeLastArchivedIndexAndNotifyWaiters(toArchive);
+                    blockingSectionBegin();
 
-                        notifyAll();
+                    try {
+                        segmentAware.markAsMovedToArchive(toArchive);
+                    }
+                    finally {
+                        blockingSectionEnd();
                     }
 
                     if (evt.isRecordable(EVT_WAL_SEGMENT_ARCHIVED)) {
                         evt.record(new WalSegmentArchivedEvent(
-                                cctx.discovery().localNode(),
-                                res.getAbsIdx(),
-                                res.getDstArchiveFile())
+                            cctx.discovery().localNode(),
+                            res.getAbsIdx(),
+                            res.getDstArchiveFile())
                         );
                     }
 
                     onIdle();
                 }
             }
-            catch (InterruptedException t) {
+            catch (IgniteInterruptedCheckedException e) {
                 Thread.currentThread().interrupt();
 
-                if (!stopped)
-                    err = t;
+                synchronized (this) {
+                    stopped = true;
+                }
             }
             catch (Throwable t) {
                 err = t;
@@ -1825,62 +1774,38 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         }
 
         /**
-         * @param idx Index.
-         */
-        private void changeLastArchivedIndexAndNotifyWaiters(long idx) {
-            lastAbsArchivedIdx = idx;
-
-            if (compressor != null)
-                compressor.onNextSegmentArchived();
-
-            archivedMonitor.setLastArchivedAbsoluteIndex(idx);
-        }
-
-        /**
          * Gets the absolute index of the next WAL segment available to write. 
Blocks till there are available file to
          * write
          *
-         * @param curIdx Current absolute index that we want to increment.
          * @return Next index (curWalSegmIdx+1) when it is ready to be written.
          * @throws StorageException If exception occurred in the archiver 
thread.
          */
-        private long nextAbsoluteSegmentIndex(long curIdx) throws 
StorageException {
+        private long nextAbsoluteSegmentIndex() throws StorageException, 
IgniteInterruptedCheckedException {
             synchronized (this) {
                 if (cleanErr != null)
                     throw cleanErr;
 
-                assert curIdx == curAbsWalIdx;
-
-                curAbsWalIdx++;
-
-                // Notify archiver thread.
-                notifyAll();
+                try {
+                    long nextIdx = segmentAware.nextAbsoluteSegmentIndex();
 
-                while (curAbsWalIdx - lastAbsArchivedIdx > 
dsCfg.getWalSegments() && cleanErr == null) {
-                    try {
+                    // Wait for formatter so that we do not open an empty file 
in DEFAULT mode.
+                    while (nextIdx % dsCfg.getWalSegments() > formatted && 
cleanErr == null)
                         wait();
 
-                        if (cleanErr != null)
-                            throw cleanErr;
-                    }
-                    catch (InterruptedException ignore) {
-                        interrupted.set(true);
-                    }
-                }
+                    if (cleanErr != null)
+                        throw cleanErr;
 
-                // Wait for formatter so that we do not open an empty file in 
DEFAULT mode.
-                while (curAbsWalIdx % dsCfg.getWalSegments() > formatted && 
cleanErr == null)
-                    try {
-                        wait();
-
-                        if (cleanErr != null)
-                            throw cleanErr;
-                    }
-                    catch (InterruptedException ignore) {
-                        interrupted.set(true);
-                    }
+                    return nextIdx;
+                }
+                catch (IgniteInterruptedCheckedException e) {
+                    if (cleanErr != null)
+                        throw cleanErr;
 
-                return curAbsWalIdx;
+                    throw e;
+                }
+                catch (InterruptedException e) {
+                    throw new IgniteInterruptedCheckedException(e);
+                }
             }
         }
 
@@ -1890,55 +1815,16 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
          * release segment later, use {@link #releaseWorkSegment} for 
unlock</li> </ul>
          */
         @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
-        private boolean checkCanReadArchiveOrReserveWorkSegment(long absIdx) {
-            synchronized (this) {
-                if (lastAbsArchivedIdx >= absIdx) {
-                    if (log.isDebugEnabled())
-                        log.debug("Not needed to reserve WAL segment: absIdx=" 
+ absIdx + ";" +
-                            " lastAbsArchivedIdx=" + lastAbsArchivedIdx);
-
-                    return true;
-
-                }
-                Integer cur = locked.get(absIdx);
-
-                cur = cur == null ? 1 : cur + 1;
-
-                locked.put(absIdx, cur);
-
-                if (log.isDebugEnabled())
-                    log.debug("Reserved work segment [absIdx=" + absIdx + ", 
pins=" + cur + ']');
-
-                return false;
-            }
+        public boolean checkCanReadArchiveOrReserveWorkSegment(long absIdx) {
+            return 
segmentAware.checkCanReadArchiveOrReserveWorkSegment(absIdx);
         }
 
         /**
          * @param absIdx Segment absolute index.
          */
         @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
-        private void releaseWorkSegment(long absIdx) {
-            synchronized (this) {
-                Integer cur = locked.get(absIdx);
-
-                assert cur != null && cur > 0 : "WAL Segment with Index " + 
absIdx + " is not locked;" +
-                    " lastAbsArchivedIdx = " + lastAbsArchivedIdx;
-
-                if (cur == 1) {
-                    locked.remove(absIdx);
-
-                    if (log.isDebugEnabled())
-                        log.debug("Fully released work segment (ready to 
archive) [absIdx=" + absIdx + ']');
-                }
-                else {
-                    locked.put(absIdx, cur - 1);
-
-                    if (log.isDebugEnabled())
-                        log.debug("Partially released work segment [absIdx=" + 
absIdx + ", pins=" + (cur - 1) + ']');
-                }
-
-                notifyAll();
-            }
+        public void releaseWorkSegment(long absIdx) {
+            segmentAware.releaseWorkSegment(absIdx);
         }
 
         /**
@@ -2028,12 +1914,8 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         /** Current thread stopping advice. */
         private volatile boolean stopped;
 
-        /** Last successfully compressed segment. */
-        private volatile long lastCompressedIdx = -1L;
-
         /** All segments prior to this (inclusive) can be compressed. */
         private volatile long minUncompressedIdxToKeep = -1L;
-
         /**
          *
          */
@@ -2057,45 +1939,22 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
             FileDescriptor[] alreadyCompressed = 
scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_COMPACTED_FILTER));
 
             if (alreadyCompressed.length > 0)
-                lastCompressedIdx = alreadyCompressed[alreadyCompressed.length 
- 1].idx();
+                
segmentAware.lastCompressedIdx(alreadyCompressed[alreadyCompressed.length - 
1].idx());
         }
 
         /**
          * @param idx Minimum raw segment index that should be preserved from 
deletion.
          */
-        synchronized void keepUncompressedIdxFrom(long idx) {
+        void keepUncompressedIdxFrom(long idx) {
             minUncompressedIdxToKeep = idx;
-
-            notify();
-        }
-
-        /**
-         * Callback for waking up compressor when new segment is archived.
-         */
-        synchronized void onNextSegmentArchived() {
-            notify();
         }
 
         /**
          * Pessimistically tries to reserve segment for compression in order 
to avoid concurrent truncation.
          * Waits if there's no segment to archive right now.
          */
-        private long tryReserveNextSegmentOrWait() throws 
InterruptedException, IgniteCheckedException {
-            long segmentToCompress = lastCompressedIdx + 1;
-
-            synchronized (this) {
-                if (stopped)
-                    return -1;
-
-                while (segmentToCompress > 
archivedMonitor.lastArchivedAbsoluteIndex()) {
-                    wait();
-
-                    if (stopped)
-                        return -1;
-                }
-            }
-
-            segmentToCompress = Math.max(segmentToCompress, 
lastTruncatedArchiveIdx + 1);
+        private long tryReserveNextSegmentOrWait() throws 
IgniteCheckedException {
+            long segmentToCompress = segmentAware.waitNextSegmentToCompress();
 
             boolean reserved = reserve(new FileWALPointer(segmentToCompress, 
0, 0));
 
@@ -2173,16 +2032,16 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                         }
                     }
 
-                    lastCompressedIdx = currReservedSegment;
+                    segmentAware.lastCompressedIdx(currReservedSegment);
+                }
+                catch (IgniteInterruptedCheckedException ignore) {
+                    Thread.currentThread().interrupt();
                 }
                 catch (IgniteCheckedException | IOException e) {
                     U.error(log, "Compression of WAL segment [idx=" + 
currReservedSegment +
                         "] was skipped due to unexpected error", e);
 
-                    lastCompressedIdx++;
-                }
-                catch (InterruptedException ignore) {
-                    Thread.currentThread().interrupt();
+                    segmentAware.lastCompressedIdx(currReservedSegment);
                 }
                 finally {
                     if (currReservedSegment != -1)
@@ -2201,7 +2060,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
             int segmentSerializerVer;
 
             try (FileIO fileIO = ioFactory.create(raw)) {
-                segmentSerializerVer = readSegmentHeader(fileIO, 
nextSegment).getSerializerVersion();
+                segmentSerializerVer = readSegmentHeader(new 
SegmentIO(nextSegment, fileIO), segmentFileInputFactory).getSerializerVersion();
             }
 
             try (ZipOutputStream zos = new ZipOutputStream(new 
BufferedOutputStream(new FileOutputStream(zip)))) {
@@ -2496,18 +2355,20 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
      */
     private abstract static class FileHandle {
         /** I/O interface for read/write operations with file */
-        FileIO fileIO;
-
-        /** Absolute WAL segment file index (incremental counter) */
-        protected final long idx;
+        SegmentIO fileIO;
 
         /**
-         * @param fileIO I/O interface for read/write operations of FileHandle.
-         * @param idx Absolute WAL segment file index (incremental counter).
+         * @param fileIO I/O interface for read/write operations of 
FileHandle.         *
          */
-        private FileHandle(FileIO fileIO, long idx) {
+        private FileHandle(SegmentIO fileIO) {
             this.fileIO = fileIO;
-            this.idx = idx;
+        }
+
+        /**
+         * @return Absolute WAL segment file index (incremental counter).
+         */
+        public long getSegmentId(){
+            return fileIO.getSegmentId();
         }
     }
 
@@ -2521,28 +2382,25 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         /** */
         FileInput in;
 
-        /**
-         * <code>true</code> if this file handle came from work directory. 
<code>false</code> if this file handle came
-         * from archive directory.
-         */
-        private boolean workDir;
+        /** Holder of actual information of latest manipulation on WAL 
segments. */
+        private final SegmentAware segmentAware;
 
         /**
          * @param fileIO I/O interface for read/write operations of FileHandle.
-         * @param idx Absolute WAL segment file index (incremental counter).
          * @param ser Entry serializer.
          * @param in File input.
+         * @param aware Segment aware.
          */
         public ReadFileHandle(
-            FileIO fileIO,
-            long idx,
+            SegmentIO fileIO,
             RecordSerializer ser,
-            FileInput in
-        ) {
-            super(fileIO, idx);
+            FileInput in,
+            SegmentAware aware) {
+            super(fileIO);
 
             this.ser = ser;
             this.in = in;
+            segmentAware = aware;
         }
 
         /**
@@ -2551,6 +2409,8 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         @Override public void close() throws IgniteCheckedException {
             try {
                 fileIO.close();
+
+                in.io().close();
             }
             catch (IOException e) {
                 throw new IgniteCheckedException(e);
@@ -2559,7 +2419,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
         /** {@inheritDoc} */
         @Override public long idx() {
-            return idx;
+            return getSegmentId();
         }
 
         /** {@inheritDoc} */
@@ -2574,7 +2434,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
         /** {@inheritDoc} */
         @Override public boolean workDir() {
-            return workDir;
+            return segmentAware != null && 
segmentAware.lastArchivedAbsoluteIndex() < getSegmentId();
         }
     }
 
@@ -2618,7 +2478,6 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
         /**
          * @param fileIO I/O file interface to use
-         * @param idx Absolute WAL segment file index for easy access.
          * @param pos Position.
          * @param resume Created on resume logging flag.
          * @param serializer Serializer.
@@ -2626,14 +2485,13 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
          * @throws IOException If failed.
          */
         private FileWriteHandle(
-            FileIO fileIO,
-            long idx,
+            SegmentIO fileIO,
             long pos,
             boolean resume,
             RecordSerializer serializer,
             SegmentedRingByteBuffer buf
         ) throws IOException {
-            super(fileIO, idx);
+            super(fileIO);
 
             assert serializer != null;
 
@@ -2656,7 +2514,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
             assert seg != null && seg.position() > 0;
 
-            prepareSerializerVersionBuffer(idx, serializerVersion(), false, 
seg.buffer());
+            prepareSerializerVersionBuffer(getSegmentId(), 
serializerVersion(), false, seg.buffer());
 
             seg.release();
         }
@@ -2692,7 +2550,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                         if (buf == null)
                             return null; // Can not write to this segment, 
need to switch to the next one.
 
-                        ptr = new FileWALPointer(idx, pos, rec.size());
+                        ptr = new FileWALPointer(getSegmentId(), pos, 
rec.size());
 
                         rec.position(ptr);
 
@@ -2735,7 +2593,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         private void flushOrWait(FileWALPointer ptr) throws 
IgniteCheckedException {
             if (ptr != null) {
                 // If requested obsolete file index, it must be already 
flushed by close.
-                if (ptr.index() != idx)
+                if (ptr.index() != getSegmentId())
                     return;
             }
 
@@ -2752,7 +2610,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                 return;
             }
 
-            assert ptr.index() == idx;
+            assert ptr.index() == getSegmentId();
 
             walWriter.flushBuffer(ptr.fileOffset());
         }
@@ -2781,7 +2639,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
             // If index has changed, it means that the log was rolled over and 
already sync'ed.
             // If requested position is smaller than last sync'ed, it also 
means all is good.
             // If position is equal, then our record is the last not synced.
-            return idx == ptr.index() && lastFsyncPos <= ptr.fileOffset();
+            return getSegmentId() == ptr.index() && lastFsyncPos <= 
ptr.fileOffset();
         }
 
         /**
@@ -2791,7 +2649,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
             lock.lock();
 
             try {
-                return new FileWALPointer(idx, (int)written, 0);
+                return new FileWALPointer(getSegmentId(), (int)written, 0);
             }
             finally {
                 lock.unlock();
@@ -2961,11 +2819,11 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                         }
                     }
                     catch (IOException e) {
-                        throw new StorageException("Failed to close WAL write 
handle [idx=" + idx + "]", e);
+                        throw new StorageException("Failed to close WAL write 
handle [idx=" + getSegmentId() + "]", e);
                     }
 
                     if (log.isDebugEnabled())
-                        log.debug("Closed WAL write handle [idx=" + idx + "]");
+                        log.debug("Closed WAL write handle [idx=" + 
getSegmentId() + "]");
 
                     return true;
                 }
@@ -2989,7 +2847,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
             try {
                 assert cctx.kernalContext().invalid() ||
                     written == lastFsyncPos || mode != WALMode.FSYNC :
-                    "fsync [written=" + written + ", lastFsync=" + 
lastFsyncPos + ", idx=" + idx + ']';
+                    "fsync [written=" + written + ", lastFsync=" + 
lastFsyncPos + ", idx=" + getSegmentId() + ']';
 
                 fileIO = null;
 
@@ -3039,12 +2897,13 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
     private static class RecordsIterator extends AbstractWalRecordsIterator {
         /** */
         private static final long serialVersionUID = 0L;
-        /** */
-        private final File walWorkDir;
 
         /** */
         private final File walArchiveDir;
 
+        /** */
+        private final File walWorkDir;
+
         /** See {@link FileWriteAheadLogManager#archiver}. */
         @Nullable private final FileArchiver archiver;
 
@@ -3062,23 +2921,31 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         @Nullable
         private FileWALPointer end;
 
+        /** Manager of segment location. */
+        private SegmentRouter segmentRouter;
+
+        /** Holder of actual information of latest manipulation on WAL 
segments. */
+        private SegmentAware segmentAware;
+
         /**
          * @param cctx Shared context.
-         * @param walWorkDir WAL work dir.
          * @param walArchiveDir WAL archive dir.
+         * @param walWorkDir WAL dir.
          * @param start Optional start pointer.
          * @param end Optional end pointer.
          * @param dsCfg Database configuration.
          * @param serializerFactory Serializer factory.
          * @param archiver File Archiver.
          * @param decompressor Decompressor.
-         * @param log Logger
-         * @throws IgniteCheckedException If failed to initialize WAL segment.
+         * @param log Logger  @throws IgniteCheckedException If failed to 
initialize WAL segment.
+         * @param segmentAware Segment aware.
+         * @param segmentRouter Segment router.
+         * @param segmentFileInputFactory
          */
         private RecordsIterator(
             GridCacheSharedContext cctx,
-            File walWorkDir,
             File walArchiveDir,
+            File walWorkDir,
             @Nullable FileWALPointer start,
             @Nullable FileWALPointer end,
             DataStorageConfiguration dsCfg,
@@ -3086,20 +2953,28 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
             FileIOFactory ioFactory,
             @Nullable FileArchiver archiver,
             FileDecompressor decompressor,
-            IgniteLogger log
+            IgniteLogger log,
+            SegmentAware segmentAware,
+            SegmentRouter segmentRouter,
+            SegmentFileInputFactory segmentFileInputFactory
         ) throws IgniteCheckedException {
             super(log,
                 cctx,
                 serializerFactory,
                 ioFactory,
-                dsCfg.getWalRecordIteratorBufferSize());
-            this.walWorkDir = walWorkDir;
+                dsCfg.getWalRecordIteratorBufferSize(),
+                segmentFileInputFactory
+            );
             this.walArchiveDir = walArchiveDir;
-            this.dsCfg = dsCfg;
+            this.walWorkDir = walWorkDir;
             this.archiver = archiver;
             this.start = start;
             this.end = end;
+            this.dsCfg = dsCfg;
+
             this.decompressor = decompressor;
+            this.segmentRouter = segmentRouter;
+            this.segmentAware = segmentAware;
 
             init();
 
@@ -3138,10 +3013,7 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
             curRec = null;
 
-            final AbstractReadFileHandle handle = closeCurrentWalSegment();
-
-            if (handle != null && handle.workDir())
-                releaseWorkSegment(curWalSegmIdx);
+            closeCurrentWalSegment();
 
             curWalSegmIdx = Integer.MAX_VALUE;
         }
@@ -3196,42 +3068,24 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         @Override protected AbstractReadFileHandle advanceSegment(
             @Nullable final AbstractReadFileHandle curWalSegment
         ) throws IgniteCheckedException {
-            if (curWalSegment != null) {
+            if (curWalSegment != null)
                 curWalSegment.close();
 
-                if (curWalSegment.workDir())
-                    releaseWorkSegment(curWalSegment.idx());
-
-            }
-
             // We are past the end marker.
             if (end != null && curWalSegmIdx + 1 > end.index())
                 return null; //stop iteration
 
             curWalSegmIdx++;
 
-            FileDescriptor fd;
-
-            boolean readArchive = canReadArchiveOrReserveWork(curWalSegmIdx);
-
-            if (archiver == null || readArchive) {
-                fd = new FileDescriptor(new File(walArchiveDir,
-                    FileDescriptor.fileName(curWalSegmIdx)));
-            }
-            else {
-                long workIdx = curWalSegmIdx % dsCfg.getWalSegments();
-
-                fd = new FileDescriptor(
-                    new File(walWorkDir, FileDescriptor.fileName(workIdx)),
-                    curWalSegmIdx);
-            }
-
-            if (log.isDebugEnabled())
-                log.debug("Reading next file [absIdx=" + curWalSegmIdx + ", 
file=" + fd.file.getAbsolutePath() + ']');
+            boolean readArchive = canReadArchiveOrReserveWork(curWalSegmIdx); 
//lock during creation handle.
 
             ReadFileHandle nextHandle;
-
             try {
+                FileDescriptor fd = segmentRouter.findSegment(curWalSegmIdx);
+
+                if (log.isDebugEnabled())
+                    log.debug("Reading next file [absIdx=" + curWalSegmIdx + 
", file=" + fd.file.getAbsolutePath() + ']');
+
                 nextHandle = initReadHandle(fd, start != null && curWalSegmIdx 
== start.index() ? start : null);
             }
             catch (FileNotFoundException e) {
@@ -3241,12 +3095,8 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                     nextHandle = null;
             }
 
-            if (nextHandle == null) {
-                if (!readArchive)
-                    releaseWorkSegment(curWalSegmIdx);
-            }
-            else
-                nextHandle.workDir = !readArchive;
+            if (!readArchive)
+                releaseWorkSegment(curWalSegmIdx);
 
             curRec = null;
 
@@ -3316,9 +3166,9 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         }
 
         /** {@inheritDoc} */
-        @Override protected AbstractReadFileHandle createReadFileHandle(FileIO 
fileIO, long idx,
+        @Override protected AbstractReadFileHandle 
createReadFileHandle(SegmentIO fileIO,
             RecordSerializer ser, FileInput in) {
-            return new ReadFileHandle(fileIO, idx, ser, in);
+            return new ReadFileHandle(fileIO, ser, in, segmentAware);
         }
     }
 

Reply via email to