IGNITE-8559 Fix WAL rollOver can be blocked by WAL iterator reservation - Fixes #4449.
Signed-off-by: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/2f72fe75 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/2f72fe75 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/2f72fe75 Branch: refs/heads/ignite-gg-14206 Commit: 2f72fe758d4256c4eb4610e5922ad3d174b43dc5 Parents: 1573f45 Author: Anton Kalashnikov <kaa....@yandex.ru> Authored: Tue Sep 25 13:50:35 2018 +0300 Committer: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com> Committed: Tue Sep 25 13:50:35 2018 +0300 ---------------------------------------------------------------------- .../cache/persistence/file/FileIOFactory.java | 1 - .../wal/AbstractWalRecordsIterator.java | 50 +- .../cache/persistence/wal/FileDescriptor.java | 17 +- .../cache/persistence/wal/FileInput.java | 481 --------------- .../wal/FileWriteAheadLogManager.java | 544 ++++++----------- .../wal/FsyncModeFileWriteAheadLogManager.java | 108 ++-- .../persistence/wal/SegmentArchivedMonitor.java | 64 -- .../wal/SegmentReservationStorage.java | 61 -- .../cache/persistence/wal/SegmentRouter.java | 90 +++ .../SingleSegmentLogicalRecordsIterator.java | 11 +- .../wal/aware/SegmentArchivedStorage.java | 137 +++++ .../persistence/wal/aware/SegmentAware.java | 234 ++++++++ .../wal/aware/SegmentCompressStorage.java | 116 ++++ .../wal/aware/SegmentCurrentStateStorage.java | 171 ++++++ .../wal/aware/SegmentLockStorage.java | 76 +++ .../wal/aware/SegmentObservable.java | 46 ++ .../wal/aware/SegmentReservationStorage.java | 62 ++ .../cache/persistence/wal/io/FileInput.java | 258 ++++++++ .../persistence/wal/io/LockedReadFileInput.java | 111 ++++ .../wal/io/LockedSegmentFileInputFactory.java | 68 +++ .../wal/io/SegmentFileInputFactory.java | 34 ++ .../cache/persistence/wal/io/SegmentIO.java | 45 ++ .../persistence/wal/io/SimpleFileInput.java | 272 +++++++++ .../wal/io/SimpleSegmentFileInputFactory.java | 33 + .../wal/reader/IgniteWalIteratorFactory.java | 13 +- .../reader/StandaloneWalRecordsIterator.java | 23 +- .../wal/serializer/RecordSerializer.java | 2 +- .../wal/serializer/RecordV1Serializer.java | 16 +- .../wal/serializer/RecordV2Serializer.java | 2 +- .../wal/IgniteWalHistoryReservationsTest.java | 87 ++- .../wal/IgniteWalIteratorSwitchSegmentTest.java | 124 +++- .../db/wal/crc/IgniteDataIntegrityTests.java | 7 +- .../persistence/wal/aware/SegmentAwareTest.java | 601 +++++++++++++++++++ .../StandaloneWalRecordsIteratorTest.java | 2 +- .../ignite/testframework/GridTestUtils.java | 108 ++-- .../ignite/testsuites/IgnitePdsTestSuite.java | 4 + .../db/wal/IgniteWalRecoveryTest.java | 139 ++++- 37 files changed, 3041 insertions(+), 1177 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java index c3a75f5..2735185 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIOFactory.java @@ -44,5 +44,4 @@ public interface FileIOFactory extends Serializable { * @throws IOException If I/O interface creation was failed. */ public FileIO create(File file, OpenOption... modes) throws IOException; - } http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java index aa8eb31..3cbe577 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java @@ -30,7 +30,9 @@ import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; -import org.apache.ignite.internal.processors.cache.persistence.file.UnzipFileIO; +import org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput; +import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentFileInputFactory; +import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.SegmentHeader; @@ -87,24 +89,29 @@ public abstract class AbstractWalRecordsIterator /** Utility buffer for reading records */ private final ByteBufferExpander buf; + /** Factory to provide I/O interfaces for read primitives with files. */ + private final SegmentFileInputFactory segmentFileInputFactory; + /** * @param log Logger. * @param sharedCtx Shared context. * @param serializerFactory Serializer of current version to read headers. * @param ioFactory ioFactory for file IO access. * @param initialReadBufferSize buffer for reading records size. + * @param segmentFileInputFactory Factory to provide I/O interfaces for read primitives with files. */ protected AbstractWalRecordsIterator( @NotNull final IgniteLogger log, @NotNull final GridCacheSharedContext sharedCtx, @NotNull final RecordSerializerFactory serializerFactory, @NotNull final FileIOFactory ioFactory, - final int initialReadBufferSize - ) { + final int initialReadBufferSize, + SegmentFileInputFactory segmentFileInputFactory) { this.log = log; this.sharedCtx = sharedCtx; this.serializerFactory = serializerFactory; this.ioFactory = ioFactory; + this.segmentFileInputFactory = segmentFileInputFactory; buf = new ByteBufferExpander(initialReadBufferSize, ByteOrder.nativeOrder()); } @@ -134,11 +141,8 @@ public abstract class AbstractWalRecordsIterator } /** - * Switches records iterator to the next record. - * <ul> - * <li>{@link #curRec} will be updated.</li> - * <li> If end of segment reached, switch to new segment is called. {@link #currWalSegment} will be updated.</li> - * </ul> + * Switches records iterator to the next record. <ul> <li>{@link #curRec} will be updated.</li> <li> If end of + * segment reached, switch to new segment is called. {@link #currWalSegment} will be updated.</li> </ul> * * {@code advance()} runs a step ahead {@link #next()} * @@ -303,16 +307,16 @@ public abstract class AbstractWalRecordsIterator protected AbstractReadFileHandle initReadHandle( @NotNull final AbstractFileDescriptor desc, @Nullable final FileWALPointer start, - @NotNull final FileIO fileIO, + @NotNull final SegmentIO fileIO, @NotNull final SegmentHeader segmentHeader ) throws IgniteCheckedException { try { - final boolean isCompacted = segmentHeader.isCompacted(); + boolean isCompacted = segmentHeader.isCompacted(); if (isCompacted) serializerFactory.skipPositionCheck(true); - FileInput in = new FileInput(fileIO, buf); + FileInput in = segmentFileInputFactory.createFileInput(fileIO, buf); if (start != null && desc.idx() == start.index()) { if (isCompacted) { @@ -329,7 +333,7 @@ public abstract class AbstractWalRecordsIterator int serVer = segmentHeader.getSerializerVersion(); - return createReadFileHandle(fileIO, desc.idx(), serializerFactory.createSerializer(serVer), in); + return createReadFileHandle(fileIO, serializerFactory.createSerializer(serVer), in); } catch (SegmentEofException | EOFException ignore) { try { @@ -368,15 +372,15 @@ public abstract class AbstractWalRecordsIterator @NotNull final AbstractFileDescriptor desc, @Nullable final FileWALPointer start ) throws IgniteCheckedException, FileNotFoundException { - FileIO fileIO = null; + SegmentIO fileIO = null; try { - fileIO = desc.isCompressed() ? new UnzipFileIO(desc.file()) : ioFactory.create(desc.file()); + fileIO = desc.toIO(ioFactory); SegmentHeader segmentHeader; try { - segmentHeader = readSegmentHeader(fileIO, curWalSegmIdx); + segmentHeader = readSegmentHeader(fileIO, segmentFileInputFactory); } catch (SegmentEofException | EOFException ignore) { try { @@ -411,8 +415,7 @@ public abstract class AbstractWalRecordsIterator /** */ protected abstract AbstractReadFileHandle createReadFileHandle( - FileIO fileIO, - long idx, + SegmentIO fileIO, RecordSerializer ser, FileInput in ); @@ -460,7 +463,9 @@ public abstract class AbstractWalRecordsIterator /** */ RecordSerializer ser(); - /** */ + /** + * + */ boolean workDir(); } @@ -474,5 +479,14 @@ public abstract class AbstractWalRecordsIterator /** */ long idx(); + + /** + * Make fileIo by this description. + * + * @param fileIOFactory Factory for fileIo creation. + * @return One of implementation of {@link FileIO}. + * @throws IOException if creation of fileIo was not success. + */ + SegmentIO toIO(FileIOFactory fileIOFactory) throws IOException; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileDescriptor.java index a73248a..f265376 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileDescriptor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileDescriptor.java @@ -17,15 +17,17 @@ package org.apache.ignite.internal.processors.cache.persistence.wal; +import java.io.File; +import java.io.IOException; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; +import org.apache.ignite.internal.processors.cache.persistence.file.UnzipFileIO; +import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO; import org.apache.ignite.internal.util.typedef.internal.SB; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; -import java.io.File; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - /** * WAL file descriptor. */ @@ -134,4 +136,11 @@ public class FileDescriptor implements Comparable<FileDescriptor>, AbstractWalRe @Override public long idx() { return idx; } + + /** {@inheritDoc} */ + @Override public SegmentIO toIO(FileIOFactory fileIOFactory) throws IOException { + FileIO fileIO = isCompressed() ? new UnzipFileIO(file()) : fileIOFactory.create(file()); + + return new SegmentIO(idx, fileIO); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java deleted file mode 100644 index 303a023..0000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java +++ /dev/null @@ -1,481 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.persistence.wal; - -import java.io.EOFException; -import java.io.IOException; -import java.nio.ByteBuffer; -import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; -import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException; -import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32; -import org.jetbrains.annotations.NotNull; - -/** - * File input, backed by byte buffer file input. - * This class allows to read data by chunks from file and then read primitives - */ -public final class FileInput implements ByteBufferBackedDataInput { - /** - * Buffer for reading blocks of data into. - * <b>Note:</b> biggest block requested from this input can't be longer than buffer capacity - */ - private ByteBuffer buf; - - /** I/O interface for read/write operations with file */ - private FileIO io; - - /** */ - private long pos; - - /** */ - private ByteBufferExpander expBuf; - - /** - * @param io FileIO to read from. - * @param buf Buffer for reading blocks of data into. - */ - public FileInput(FileIO io, ByteBufferExpander buf) throws IOException { - assert io != null; - - this.io = io; - this.buf = buf.buffer(); - - expBuf = buf; - - pos = io.position(); - - clearBuffer(); - } - - /** - * File I/O. - */ - public FileIO io() { - return io; - } - - /** - * Clear buffer. - */ - private void clearBuffer() { - buf.clear(); - buf.limit(0); - - assert buf.remaining() == 0; // Buffer is empty. - } - - /** - * @param pos Position in bytes from file begin. - */ - public void seek(long pos) throws IOException { - if (pos > io.size()) - throw new EOFException(); - - io.position(pos); - - this.pos = pos; - - clearBuffer(); - } - - /** - * @return Underlying buffer. - */ - @Override public ByteBuffer buffer() { - return buf; - } - - - /** {@inheritDoc} */ - @Override public void ensure(int requested) throws IOException { - int available = buf.remaining(); - - if (available >= requested) - return; - - if (buf.capacity() < requested) { - if (expBuf == null) - throw new IOException("Requested size is greater than buffer: " + requested); - - buf = expBuf.expand(requested); - - assert available == buf.remaining(); - } - - buf.compact(); - - do { - int read = io.read(buf); - - if (read == -1) - throw new EOFException("EOF at position [" + io.position() + "] expected to read [" + requested + "] bytes"); - - available += read; - - pos += read; - } - while (available < requested); - - buf.flip(); - } - - /** - * @return Position in the stream. - */ - public long position() { - return pos - buf.remaining(); - } - - /** - * {@inheritDoc} - */ - @Override public void readFully(@NotNull byte[] b) throws IOException { - ensure(b.length); - - buf.get(b); - } - - /** - * {@inheritDoc} - */ - @Override public void readFully(@NotNull byte[] b, int off, int len) throws IOException { - ensure(len); - - buf.get(b, off, len); - } - - /** - * {@inheritDoc} - */ - @Override public int skipBytes(int n) throws IOException { - if (buf.remaining() >= n) - buf.position(buf.position() + n); - else - seek(pos + n); - - return n; - } - - /** - * {@inheritDoc} - */ - @Override public boolean readBoolean() throws IOException { - return readByte() == 1; - } - - /** - * {@inheritDoc} - */ - @Override public byte readByte() throws IOException { - ensure(1); - - return buf.get(); - } - - /** - * {@inheritDoc} - */ - @Override public int readUnsignedByte() throws IOException { - return readByte() & 0xFF; - } - - /** - * {@inheritDoc} - */ - @Override public short readShort() throws IOException { - ensure(2); - - return buf.getShort(); - } - - /** - * {@inheritDoc} - */ - @Override public int readUnsignedShort() throws IOException { - return readShort() & 0xFFFF; - } - - /** - * {@inheritDoc} - */ - @Override public char readChar() throws IOException { - ensure(2); - - return buf.getChar(); - } - - /** - * {@inheritDoc} - */ - @Override public int readInt() throws IOException { - ensure(4); - - return buf.getInt(); - } - - /** - * {@inheritDoc} - */ - @Override public long readLong() throws IOException { - ensure(8); - - return buf.getLong(); - } - - /** - * {@inheritDoc} - */ - @Override public float readFloat() throws IOException { - ensure(4); - - return buf.getFloat(); - } - - /** - * {@inheritDoc} - */ - @Override public double readDouble() throws IOException { - ensure(8); - - return buf.getDouble(); - } - - /** - * {@inheritDoc} - */ - @Override public String readLine() throws IOException { - throw new UnsupportedOperationException(); - } - - /** - * {@inheritDoc} - */ - @Override public String readUTF() throws IOException { - throw new UnsupportedOperationException(); - } - - /** - * @param skipCheck If CRC check should be skipped. - * @return autoclosable fileInput, after its closing crc32 will be calculated and compared with saved one - */ - public Crc32CheckingFileInput startRead(boolean skipCheck) { - return new Crc32CheckingFileInput(buf.position(), skipCheck); - } - - /** - * Checking of CRC32. - */ - public class Crc32CheckingFileInput implements ByteBufferBackedDataInput, AutoCloseable { - /** */ - private final PureJavaCrc32 crc32 = new PureJavaCrc32(); - - /** Last calc position. */ - private int lastCalcPosition; - - /** Skip crc check. */ - private boolean skipCheck; - - /** - * @param position Position. - */ - public Crc32CheckingFileInput(int position, boolean skipCheck) { - this.lastCalcPosition = position; - this.skipCheck = skipCheck; - } - - /** {@inheritDoc} */ - @Override public void ensure(int requested) throws IOException { - int available = buf.remaining(); - - if (available >= requested) - return; - - updateCrc(); - - FileInput.this.ensure(requested); - - lastCalcPosition = 0; - } - - /** {@inheritDoc} */ - @Override public void close() throws Exception { - updateCrc(); - - int val = crc32.getValue(); - - int writtenCrc = this.readInt(); - - if ((val ^ writtenCrc) != 0 && !skipCheck) { - // If it last message we will skip it (EOF will be thrown). - ensure(5); - - throw new IgniteDataIntegrityViolationException( - "val: " + val + " writtenCrc: " + writtenCrc - ); - } - } - - /** - * - */ - private void updateCrc() { - if (skipCheck) - return; - - int oldPos = buf.position(); - - buf.position(lastCalcPosition); - - crc32.update(buf, oldPos - lastCalcPosition); - - lastCalcPosition = oldPos; - } - - /** {@inheritDoc} */ - @Override public int skipBytes(int n) throws IOException { - ensure(n); - - int skipped = Math.min(buf.remaining(), n); - - buf.position(buf.position() + skipped); - - return skipped; - } - - /** - * {@inheritDoc} - */ - @Override public void readFully(@NotNull byte[] b) throws IOException { - ensure(b.length); - - buf.get(b); - } - - /** - * {@inheritDoc} - */ - @Override public void readFully(@NotNull byte[] b, int off, int len) throws IOException { - ensure(len); - - buf.get(b, off, len); - } - - /** - * {@inheritDoc} - */ - @Override public boolean readBoolean() throws IOException { - return readByte() == 1; - } - - /** - * {@inheritDoc} - */ - @Override public byte readByte() throws IOException { - ensure(1); - - return buf.get(); - } - - /** - * {@inheritDoc} - */ - @Override public int readUnsignedByte() throws IOException { - return readByte() & 0xFF; - } - - /** - * {@inheritDoc} - */ - @Override public short readShort() throws IOException { - ensure(2); - - return buf.getShort(); - } - - /** - * {@inheritDoc} - */ - @Override public int readUnsignedShort() throws IOException { - return readShort() & 0xFFFF; - } - - /** - * {@inheritDoc} - */ - @Override public char readChar() throws IOException { - ensure(2); - - return buf.getChar(); - } - - /** - * {@inheritDoc} - */ - @Override public int readInt() throws IOException { - ensure(4); - - return buf.getInt(); - } - - /** - * {@inheritDoc} - */ - @Override public long readLong() throws IOException { - ensure(8); - - return buf.getLong(); - } - - /** - * {@inheritDoc} - */ - @Override public float readFloat() throws IOException { - ensure(4); - - return buf.getFloat(); - } - - /** - * {@inheritDoc} - */ - @Override public double readDouble() throws IOException { - ensure(8); - - return buf.getDouble(); - } - - /** - * {@inheritDoc} - */ - @Override public String readLine() throws IOException { - throw new UnsupportedOperationException(); - } - - /** - * {@inheritDoc} - */ - @Override public String readUTF() throws IOException { - throw new UnsupportedOperationException(); - } - - /** {@inheritDoc} */ - @Override public ByteBuffer buffer() { - return FileInput.this.buffer(); - } - } -} http://git-wip-us.apache.org/repos/asf/ignite/blob/2f72fe75/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index f53c02f..8765309 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -97,7 +97,13 @@ import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccess import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings; import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException; import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32; +import org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput; +import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentFileInputFactory; +import org.apache.ignite.internal.processors.cache.persistence.wal.io.LockedSegmentFileInputFactory; +import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO; +import org.apache.ignite.internal.processors.cache.persistence.wal.io.SimpleSegmentFileInputFactory; import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord; +import org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentAware; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl; @@ -251,13 +257,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private static final double THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE = IgniteSystemProperties.getDouble(IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE, 0.5); - /** Interrupted flag. */ - private final ThreadLocal<Boolean> interrupted = new ThreadLocal<Boolean>() { - @Override protected Boolean initialValue() { - return false; - } - }; - /** */ private final boolean alwaysWriteFullPages; @@ -310,17 +309,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private final int serializerVer = IgniteSystemProperties.getInteger(IGNITE_WAL_SERIALIZER_VERSION, LATEST_SERIALIZER_VERSION); - /** Latest segment cleared by {@link #truncate(WALPointer, WALPointer)}. */ - private volatile long lastTruncatedArchiveIdx = -1L; - /** Factory to provide I/O interfaces for read/write operations with files */ private volatile FileIOFactory ioFactory; - /** Next WAL segment archived monitor. Manages last archived index, emulates archivation in no-archiver mode. */ - private final SegmentArchivedMonitor archivedMonitor = new SegmentArchivedMonitor(); + /** Factory to provide I/O interfaces for read primitives with files */ + private final SegmentFileInputFactory segmentFileInputFactory; - /** Segment reservations storage: Protects WAL segments from deletion during WAL log cleanup. */ - private final SegmentReservationStorage reservationStorage = new SegmentReservationStorage(); + /** Holder of actual information of latest manipulation on WAL segments. */ + private final SegmentAware segmentAware; /** Updater for {@link #currHnd}, used for verify there are no concurrent update for current log segment handle */ private static final AtomicReferenceFieldUpdater<FileWriteAheadLogManager, FileWriteHandle> CURR_HND_UPD = @@ -384,6 +380,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private WalSegmentSyncer walSegmentSyncWorker; /** + * Manage of segment location. + */ + private SegmentRouter segmentRouter; + + /** Segment factory with ability locked segment during reading. */ + private SegmentFileInputFactory lockedSegmentFileInputFactory; + + /** * @param ctx Kernal context. */ public FileWriteAheadLogManager(@NotNull final GridKernalContext ctx) { @@ -401,6 +405,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl fsyncDelay = dsCfg.getWalFsyncDelayNanos(); alwaysWriteFullPages = dsCfg.isAlwaysWriteFullPages(); ioFactory = new RandomAccessFileIOFactory(); + segmentFileInputFactory = new SimpleSegmentFileInputFactory(); walAutoArchiveAfterInactivity = dsCfg.getWalAutoArchiveAfterInactivity(); maxSegCountWithoutCheckpoint = @@ -410,6 +415,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl evt = ctx.event(); failureProcessor = ctx.failure(); + segmentAware = new SegmentAware(dsCfg.getWalSegments()); } /** @@ -467,7 +473,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl IgniteBiTuple<Long, Long> tup = scanMinMaxArchiveIndices(); - lastTruncatedArchiveIdx = tup == null ? -1 : tup.get1() - 1; + segmentAware.lastTruncatedArchiveIdx(tup == null ? -1 : tup.get1() - 1); long lastAbsArchivedIdx = tup == null ? -1 : tup.get2(); @@ -477,7 +483,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl archiver = null; if (lastAbsArchivedIdx > 0) - archivedMonitor.setLastArchivedAbsoluteIndex(lastAbsArchivedIdx); + segmentAware.setLastArchivedAbsoluteIndex(lastAbsArchivedIdx); if (dsCfg.isWalCompactionEnabled()) { compressor = new FileCompressor(); @@ -489,6 +495,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } } + segmentRouter = new SegmentRouter(walWorkDir, walArchiveDir, segmentAware, dsCfg); + walDisableContext = cctx.walState().walDisableContext(); if (mode != WALMode.NONE && mode != WALMode.FSYNC) { @@ -501,6 +509,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl else U.quietAndWarn(log, "Started write-ahead log manager in NONE mode, persisted data may be lost in " + "a case of unexpected node failure. Make sure to deactivate the cluster before shutdown."); + + lockedSegmentFileInputFactory = new LockedSegmentFileInputFactory( + segmentAware, + segmentRouter, + ioFactory + ); } } @@ -525,7 +539,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl public Collection<File> getAndReserveWalFiles(FileWALPointer low, FileWALPointer high) throws IgniteCheckedException { final long awaitIdx = high.index() - 1; - archivedMonitor.awaitSegmentArchived(awaitIdx); + segmentAware.awaitSegmentArchived(awaitIdx); if (!reserve(low)) throw new IgniteCheckedException("WAL archive segment has been deleted [idx=" + low.index() + "]"); @@ -598,6 +612,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (walWriter != null) walWriter.shutdown(); + segmentAware.interrupt(); + if (archiver != null) archiver.shutdown(); @@ -792,14 +808,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (rec.rollOver()) { assert cctx.database().checkpointLockIsHeldByThread(); - long idx = currWrHandle.idx; + long idx = currWrHandle.getSegmentId(); currWrHandle.buf.close(); currWrHandle = rollOver(currWrHandle); if (log != null && log.isInfoEnabled()) - log.info("Rollover segment [" + idx + " to " + currWrHandle.idx + "], recordType=" + rec.type()); + log.info("Rollover segment [" + idx + " to " + currWrHandle.getSegmentId() + "], recordType=" + rec.type()); } WALPointer ptr = currWrHandle.addRecord(rec); @@ -863,8 +879,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl return new RecordsIterator( cctx, - walWorkDir, walArchiveDir, + walWorkDir, (FileWALPointer)start, end, dsCfg, @@ -872,8 +888,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl ioFactory, archiver, decompressor, - log - ); + log, + segmentAware, + segmentRouter, + lockedSegmentFileInputFactory); } /** {@inheritDoc} */ @@ -883,10 +901,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (mode == WALMode.NONE) return false; - reservationStorage.reserve(((FileWALPointer)start).index()); + segmentAware.reserve(((FileWALPointer)start).index()); if (!hasIndex(((FileWALPointer)start).index())) { - reservationStorage.release(((FileWALPointer)start).index()); + segmentAware.release(((FileWALPointer)start).index()); return false; } @@ -901,7 +919,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (mode == WALMode.NONE) return; - reservationStorage.release(((FileWALPointer)start).index()); + segmentAware.release(((FileWALPointer)start).index()); } /** @@ -924,7 +942,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl FileWriteHandle cur = currHnd; - return cur != null && cur.idx >= absIdx; + return cur != null && cur.getSegmentId() >= absIdx; } /** {@inheritDoc} */ @@ -950,7 +968,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (segmentReservedOrLocked(desc.idx)) return deleted; - long archivedAbsIdx = archivedMonitor.lastArchivedAbsoluteIndex(); + long archivedAbsIdx = segmentAware.lastArchivedAbsoluteIndex(); long lastArchived = archivedAbsIdx >= 0 ? archivedAbsIdx : lastArchivedIndex(); @@ -963,8 +981,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl deleted++; // Bump up the oldest archive segment index. - if (lastTruncatedArchiveIdx < desc.idx) - lastTruncatedArchiveIdx = desc.idx; + if (segmentAware.lastTruncatedArchiveIdx() < desc.idx) + segmentAware.lastTruncatedArchiveIdx(desc.idx); } } @@ -981,9 +999,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private boolean segmentReservedOrLocked(long absIdx) { FileArchiver archiver0 = archiver; - return ((archiver0 != null) && archiver0.locked(absIdx)) - || (reservationStorage.reserved(absIdx)); - + return ((archiver0 != null) && segmentAware.locked(absIdx)) || (segmentAware.reserved(absIdx)); } /** {@inheritDoc} */ @@ -994,9 +1010,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** {@inheritDoc} */ @Override public int walArchiveSegments() { - long lastTruncated = lastTruncatedArchiveIdx; + long lastTruncated = segmentAware.lastTruncatedArchiveIdx(); - long lastArchived = archivedMonitor.lastArchivedAbsoluteIndex(); + long lastArchived = segmentAware.lastArchivedAbsoluteIndex(); if (lastArchived == -1) return 0; @@ -1008,12 +1024,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** {@inheritDoc} */ @Override public long lastArchivedSegment() { - return archivedMonitor.lastArchivedAbsoluteIndex(); + return segmentAware.lastArchivedAbsoluteIndex(); } /** {@inheritDoc} */ @Override public long lastCompactedSegment() { - return compressor != null ? compressor.lastCompressedIdx : -1L; + return segmentAware.lastCompressedIdx(); } /** {@inheritDoc} */ @@ -1170,7 +1186,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl next.writeHeader(); - if (next.idx - lashCheckpointFileIdx() >= maxSegCountWithoutCheckpoint) + if (next.getSegmentId() - lashCheckpointFileIdx() >= maxSegCountWithoutCheckpoint) cctx.database().forceCheckpoint("too big size of WAL without checkpoint"); boolean swapped = CURR_HND_UPD.compareAndSet(this, hnd, next); @@ -1216,7 +1232,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl int len = lastReadPtr == null ? 0 : lastReadPtr.length(); try { - FileIO fileIO = ioFactory.create(curFile); + SegmentIO fileIO = new SegmentIO(absIdx, ioFactory.create(curFile)); IgniteInClosure<FileIO> lsnr = createWalFileListener; @@ -1229,7 +1245,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl // If we have existing segment, try to read version from it. if (lastReadPtr != null) { try { - serVer = readSegmentHeader(fileIO, absIdx).getSerializerVersion(); + serVer = readSegmentHeader(fileIO, segmentFileInputFactory).getSerializerVersion(); } catch (SegmentEofException | EOFException ignore) { serVer = serializerVer; @@ -1257,16 +1273,15 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl FileWriteHandle hnd = new FileWriteHandle( fileIO, - absIdx, off + len, true, ser, rbuf); if (archiver0 != null) - archiver0.currentWalIndex(absIdx); + segmentAware.curAbsWalIdx(absIdx); else - archivedMonitor.setLastArchivedAbsoluteIndex(absIdx - 1); + segmentAware.setLastArchivedAbsoluteIndex(absIdx - 1); return hnd; } @@ -1301,22 +1316,22 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl IgniteCheckedException error = null; try { - File nextFile = pollNextFile(cur.idx); + File nextFile = pollNextFile(cur.getSegmentId()); if (log.isDebugEnabled()) log.debug("Switching to a new WAL segment: " + nextFile.getAbsolutePath()); SegmentedRingByteBuffer rbuf = null; - FileIO fileIO = null; + SegmentIO fileIO = null; FileWriteHandle hnd; - boolean interrupted = this.interrupted.get(); + boolean interrupted = false; while (true) { try { - fileIO = ioFactory.create(nextFile); + fileIO = new SegmentIO(cur.getSegmentId() + 1, ioFactory.create(nextFile)); IgniteInClosure<FileIO> lsnr = createWalFileListener; if (lsnr != null) @@ -1332,7 +1347,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl hnd = new FileWriteHandle( fileIO, - cur.idx + 1, 0, false, serializer, @@ -1365,9 +1379,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl rbuf = null; } } - finally { - this.interrupted.set(false); - } } return hnd; @@ -1521,17 +1532,17 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @return File ready for use as new WAL segment. * @throws StorageException If exception occurred in the archiver thread. */ - private File pollNextFile(long curIdx) throws StorageException { + private File pollNextFile(long curIdx) throws StorageException, IgniteInterruptedCheckedException { FileArchiver archiver0 = archiver; if (archiver0 == null) { - archivedMonitor.setLastArchivedAbsoluteIndex(curIdx); + segmentAware.setLastArchivedAbsoluteIndex(curIdx); return new File(walWorkDir, FileDescriptor.fileName(curIdx + 1)); } // Signal to archiver that we are done with the segment and it can be archived. - long absNextIdx = archiver0.nextAbsoluteSegmentIndex(curIdx); + long absNextIdx = archiver0.nextAbsoluteSegmentIndex(); long segmentIdx = absNextIdx % dsCfg.getWalSegments(); @@ -1624,23 +1635,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * * Monitor of current object is used for notify on: <ul> <li>exception occurred ({@link * FileArchiver#cleanErr}!=null)</li> <li>stopping thread ({@link FileArchiver#stopped}==true)</li> <li>current file - * index changed ({@link FileArchiver#curAbsWalIdx})</li> <li>last archived file index was changed ({@link - * FileArchiver#lastAbsArchivedIdx})</li> <li>some WAL index was removed from {@link FileArchiver#locked} map</li> + * index changed </li> <li>last archived file index was changed ({@link + * </li> <li>some WAL index was removed from map</li> * </ul> */ private class FileArchiver extends GridWorker { /** Exception which occurred during initial creation of files or during archiving WAL segment */ private StorageException cleanErr; - /** - * Absolute current segment index WAL Manager writes to. Guarded by <code>this</code>. Incremented during - * rollover. Also may be directly set if WAL is resuming logging after start. - */ - private long curAbsWalIdx = -1; - - /** Last archived file index (absolute, 0-based). Guarded by <code>this</code>. */ - private volatile long lastAbsArchivedIdx = -1; - /** current thread stopping advice */ private volatile boolean stopped; @@ -1648,19 +1650,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private int formatted; /** - * Maps absolute segment index to locks counter. Lock on segment protects from archiving segment and may come - * from {@link RecordsIterator} during WAL replay. Map itself is guarded by <code>this</code>. - */ - private Map<Long, Integer> locked = new HashMap<>(); - - /** * */ private FileArchiver(long lastAbsArchivedIdx, IgniteLogger log) { super(cctx.igniteInstanceName(), "wal-file-archiver%" + cctx.igniteInstanceName(), log, cctx.kernalContext().workersRegistry()); - this.lastAbsArchivedIdx = lastAbsArchivedIdx; + segmentAware.setLastArchivedAbsoluteIndex(lastAbsArchivedIdx); } /** @@ -1676,27 +1672,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl U.join(runner()); } - /** - * @param curAbsWalIdx Current absolute WAL segment index. - */ - private void currentWalIndex(long curAbsWalIdx) { - synchronized (this) { - this.curAbsWalIdx = curAbsWalIdx; - - notifyAll(); - } - } - - /** - * Check if WAL segment locked (protected from move to archive) - * - * @param absIdx Index for check reservation. - * @return {@code True} if index is locked. - */ - private synchronized boolean locked(long absIdx) { - return locked.containsKey(absIdx); - } - /** {@inheritDoc} */ @Override protected void body() { blockingSectionBegin(); @@ -1709,6 +1684,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl // Stop the thread and report to starter. cleanErr = e; + segmentAware.forceInterrupt(); + notifyAll(); } @@ -1723,44 +1700,24 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl Throwable err = null; try { - synchronized (this) { - while (curAbsWalIdx == -1 && !stopped) { - blockingSectionBegin(); - - try { - wait(); - } - finally { - blockingSectionEnd(); - } - } - - // If the archive directory is empty, we can be sure that there were no WAL segments archived. - // This is ensured by the check in truncate() which will leave at least one file there - // once it was archived. + blockingSectionBegin(); + try { + segmentAware.awaitSegment(0);//wait for init at least one work segments. + } + finally { + blockingSectionEnd(); } - while (!Thread.currentThread().isInterrupted() && !stopped) { long toArchive; - synchronized (this) { - assert lastAbsArchivedIdx <= curAbsWalIdx : "lastArchived=" + lastAbsArchivedIdx + - ", current=" + curAbsWalIdx; - - while (lastAbsArchivedIdx >= curAbsWalIdx - 1 && !stopped) { - blockingSectionBegin(); - - try { - wait(); - } - finally { - blockingSectionEnd(); - } - } + blockingSectionBegin(); - toArchive = lastAbsArchivedIdx + 1; + try { + toArchive = segmentAware.waitNextSegmentForArchivation(); + } + finally { + blockingSectionEnd(); } - if (stopped) break; @@ -1775,40 +1732,32 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl blockingSectionEnd(); } - synchronized (this) { - while (locked.containsKey(toArchive) && !stopped) { - blockingSectionBegin(); - - try { - wait(); - } - finally { - blockingSectionEnd(); - } - } - - // Then increase counter to allow rollover on clean working file - changeLastArchivedIndexAndNotifyWaiters(toArchive); + blockingSectionBegin(); - notifyAll(); + try { + segmentAware.markAsMovedToArchive(toArchive); + } + finally { + blockingSectionEnd(); } if (evt.isRecordable(EVT_WAL_SEGMENT_ARCHIVED)) { evt.record(new WalSegmentArchivedEvent( - cctx.discovery().localNode(), - res.getAbsIdx(), - res.getDstArchiveFile()) + cctx.discovery().localNode(), + res.getAbsIdx(), + res.getDstArchiveFile()) ); } onIdle(); } } - catch (InterruptedException t) { + catch (IgniteInterruptedCheckedException e) { Thread.currentThread().interrupt(); - if (!stopped) - err = t; + synchronized (this) { + stopped = true; + } } catch (Throwable t) { err = t; @@ -1825,62 +1774,38 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** - * @param idx Index. - */ - private void changeLastArchivedIndexAndNotifyWaiters(long idx) { - lastAbsArchivedIdx = idx; - - if (compressor != null) - compressor.onNextSegmentArchived(); - - archivedMonitor.setLastArchivedAbsoluteIndex(idx); - } - - /** * Gets the absolute index of the next WAL segment available to write. Blocks till there are available file to * write * - * @param curIdx Current absolute index that we want to increment. * @return Next index (curWalSegmIdx+1) when it is ready to be written. * @throws StorageException If exception occurred in the archiver thread. */ - private long nextAbsoluteSegmentIndex(long curIdx) throws StorageException { + private long nextAbsoluteSegmentIndex() throws StorageException, IgniteInterruptedCheckedException { synchronized (this) { if (cleanErr != null) throw cleanErr; - assert curIdx == curAbsWalIdx; - - curAbsWalIdx++; - - // Notify archiver thread. - notifyAll(); + try { + long nextIdx = segmentAware.nextAbsoluteSegmentIndex(); - while (curAbsWalIdx - lastAbsArchivedIdx > dsCfg.getWalSegments() && cleanErr == null) { - try { + // Wait for formatter so that we do not open an empty file in DEFAULT mode. + while (nextIdx % dsCfg.getWalSegments() > formatted && cleanErr == null) wait(); - if (cleanErr != null) - throw cleanErr; - } - catch (InterruptedException ignore) { - interrupted.set(true); - } - } + if (cleanErr != null) + throw cleanErr; - // Wait for formatter so that we do not open an empty file in DEFAULT mode. - while (curAbsWalIdx % dsCfg.getWalSegments() > formatted && cleanErr == null) - try { - wait(); - - if (cleanErr != null) - throw cleanErr; - } - catch (InterruptedException ignore) { - interrupted.set(true); - } + return nextIdx; + } + catch (IgniteInterruptedCheckedException e) { + if (cleanErr != null) + throw cleanErr; - return curAbsWalIdx; + throw e; + } + catch (InterruptedException e) { + throw new IgniteInterruptedCheckedException(e); + } } } @@ -1890,55 +1815,16 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * release segment later, use {@link #releaseWorkSegment} for unlock</li> </ul> */ @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext") - private boolean checkCanReadArchiveOrReserveWorkSegment(long absIdx) { - synchronized (this) { - if (lastAbsArchivedIdx >= absIdx) { - if (log.isDebugEnabled()) - log.debug("Not needed to reserve WAL segment: absIdx=" + absIdx + ";" + - " lastAbsArchivedIdx=" + lastAbsArchivedIdx); - - return true; - - } - Integer cur = locked.get(absIdx); - - cur = cur == null ? 1 : cur + 1; - - locked.put(absIdx, cur); - - if (log.isDebugEnabled()) - log.debug("Reserved work segment [absIdx=" + absIdx + ", pins=" + cur + ']'); - - return false; - } + public boolean checkCanReadArchiveOrReserveWorkSegment(long absIdx) { + return segmentAware.checkCanReadArchiveOrReserveWorkSegment(absIdx); } /** * @param absIdx Segment absolute index. */ @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext") - private void releaseWorkSegment(long absIdx) { - synchronized (this) { - Integer cur = locked.get(absIdx); - - assert cur != null && cur > 0 : "WAL Segment with Index " + absIdx + " is not locked;" + - " lastAbsArchivedIdx = " + lastAbsArchivedIdx; - - if (cur == 1) { - locked.remove(absIdx); - - if (log.isDebugEnabled()) - log.debug("Fully released work segment (ready to archive) [absIdx=" + absIdx + ']'); - } - else { - locked.put(absIdx, cur - 1); - - if (log.isDebugEnabled()) - log.debug("Partially released work segment [absIdx=" + absIdx + ", pins=" + (cur - 1) + ']'); - } - - notifyAll(); - } + public void releaseWorkSegment(long absIdx) { + segmentAware.releaseWorkSegment(absIdx); } /** @@ -2028,12 +1914,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** Current thread stopping advice. */ private volatile boolean stopped; - /** Last successfully compressed segment. */ - private volatile long lastCompressedIdx = -1L; - /** All segments prior to this (inclusive) can be compressed. */ private volatile long minUncompressedIdxToKeep = -1L; - /** * */ @@ -2057,45 +1939,22 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl FileDescriptor[] alreadyCompressed = scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_COMPACTED_FILTER)); if (alreadyCompressed.length > 0) - lastCompressedIdx = alreadyCompressed[alreadyCompressed.length - 1].idx(); + segmentAware.lastCompressedIdx(alreadyCompressed[alreadyCompressed.length - 1].idx()); } /** * @param idx Minimum raw segment index that should be preserved from deletion. */ - synchronized void keepUncompressedIdxFrom(long idx) { + void keepUncompressedIdxFrom(long idx) { minUncompressedIdxToKeep = idx; - - notify(); - } - - /** - * Callback for waking up compressor when new segment is archived. - */ - synchronized void onNextSegmentArchived() { - notify(); } /** * Pessimistically tries to reserve segment for compression in order to avoid concurrent truncation. * Waits if there's no segment to archive right now. */ - private long tryReserveNextSegmentOrWait() throws InterruptedException, IgniteCheckedException { - long segmentToCompress = lastCompressedIdx + 1; - - synchronized (this) { - if (stopped) - return -1; - - while (segmentToCompress > archivedMonitor.lastArchivedAbsoluteIndex()) { - wait(); - - if (stopped) - return -1; - } - } - - segmentToCompress = Math.max(segmentToCompress, lastTruncatedArchiveIdx + 1); + private long tryReserveNextSegmentOrWait() throws IgniteCheckedException { + long segmentToCompress = segmentAware.waitNextSegmentToCompress(); boolean reserved = reserve(new FileWALPointer(segmentToCompress, 0, 0)); @@ -2173,16 +2032,16 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } } - lastCompressedIdx = currReservedSegment; + segmentAware.lastCompressedIdx(currReservedSegment); + } + catch (IgniteInterruptedCheckedException ignore) { + Thread.currentThread().interrupt(); } catch (IgniteCheckedException | IOException e) { U.error(log, "Compression of WAL segment [idx=" + currReservedSegment + "] was skipped due to unexpected error", e); - lastCompressedIdx++; - } - catch (InterruptedException ignore) { - Thread.currentThread().interrupt(); + segmentAware.lastCompressedIdx(currReservedSegment); } finally { if (currReservedSegment != -1) @@ -2201,7 +2060,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl int segmentSerializerVer; try (FileIO fileIO = ioFactory.create(raw)) { - segmentSerializerVer = readSegmentHeader(fileIO, nextSegment).getSerializerVersion(); + segmentSerializerVer = readSegmentHeader(new SegmentIO(nextSegment, fileIO), segmentFileInputFactory).getSerializerVersion(); } try (ZipOutputStream zos = new ZipOutputStream(new BufferedOutputStream(new FileOutputStream(zip)))) { @@ -2496,18 +2355,20 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl */ private abstract static class FileHandle { /** I/O interface for read/write operations with file */ - FileIO fileIO; - - /** Absolute WAL segment file index (incremental counter) */ - protected final long idx; + SegmentIO fileIO; /** - * @param fileIO I/O interface for read/write operations of FileHandle. - * @param idx Absolute WAL segment file index (incremental counter). + * @param fileIO I/O interface for read/write operations of FileHandle. * */ - private FileHandle(FileIO fileIO, long idx) { + private FileHandle(SegmentIO fileIO) { this.fileIO = fileIO; - this.idx = idx; + } + + /** + * @return Absolute WAL segment file index (incremental counter). + */ + public long getSegmentId(){ + return fileIO.getSegmentId(); } } @@ -2521,28 +2382,25 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** */ FileInput in; - /** - * <code>true</code> if this file handle came from work directory. <code>false</code> if this file handle came - * from archive directory. - */ - private boolean workDir; + /** Holder of actual information of latest manipulation on WAL segments. */ + private final SegmentAware segmentAware; /** * @param fileIO I/O interface for read/write operations of FileHandle. - * @param idx Absolute WAL segment file index (incremental counter). * @param ser Entry serializer. * @param in File input. + * @param aware Segment aware. */ public ReadFileHandle( - FileIO fileIO, - long idx, + SegmentIO fileIO, RecordSerializer ser, - FileInput in - ) { - super(fileIO, idx); + FileInput in, + SegmentAware aware) { + super(fileIO); this.ser = ser; this.in = in; + segmentAware = aware; } /** @@ -2551,6 +2409,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl @Override public void close() throws IgniteCheckedException { try { fileIO.close(); + + in.io().close(); } catch (IOException e) { throw new IgniteCheckedException(e); @@ -2559,7 +2419,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** {@inheritDoc} */ @Override public long idx() { - return idx; + return getSegmentId(); } /** {@inheritDoc} */ @@ -2574,7 +2434,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** {@inheritDoc} */ @Override public boolean workDir() { - return workDir; + return segmentAware != null && segmentAware.lastArchivedAbsoluteIndex() < getSegmentId(); } } @@ -2618,7 +2478,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** * @param fileIO I/O file interface to use - * @param idx Absolute WAL segment file index for easy access. * @param pos Position. * @param resume Created on resume logging flag. * @param serializer Serializer. @@ -2626,14 +2485,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @throws IOException If failed. */ private FileWriteHandle( - FileIO fileIO, - long idx, + SegmentIO fileIO, long pos, boolean resume, RecordSerializer serializer, SegmentedRingByteBuffer buf ) throws IOException { - super(fileIO, idx); + super(fileIO); assert serializer != null; @@ -2656,7 +2514,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl assert seg != null && seg.position() > 0; - prepareSerializerVersionBuffer(idx, serializerVersion(), false, seg.buffer()); + prepareSerializerVersionBuffer(getSegmentId(), serializerVersion(), false, seg.buffer()); seg.release(); } @@ -2692,7 +2550,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (buf == null) return null; // Can not write to this segment, need to switch to the next one. - ptr = new FileWALPointer(idx, pos, rec.size()); + ptr = new FileWALPointer(getSegmentId(), pos, rec.size()); rec.position(ptr); @@ -2735,7 +2593,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private void flushOrWait(FileWALPointer ptr) throws IgniteCheckedException { if (ptr != null) { // If requested obsolete file index, it must be already flushed by close. - if (ptr.index() != idx) + if (ptr.index() != getSegmentId()) return; } @@ -2752,7 +2610,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl return; } - assert ptr.index() == idx; + assert ptr.index() == getSegmentId(); walWriter.flushBuffer(ptr.fileOffset()); } @@ -2781,7 +2639,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl // If index has changed, it means that the log was rolled over and already sync'ed. // If requested position is smaller than last sync'ed, it also means all is good. // If position is equal, then our record is the last not synced. - return idx == ptr.index() && lastFsyncPos <= ptr.fileOffset(); + return getSegmentId() == ptr.index() && lastFsyncPos <= ptr.fileOffset(); } /** @@ -2791,7 +2649,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl lock.lock(); try { - return new FileWALPointer(idx, (int)written, 0); + return new FileWALPointer(getSegmentId(), (int)written, 0); } finally { lock.unlock(); @@ -2961,11 +2819,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } } catch (IOException e) { - throw new StorageException("Failed to close WAL write handle [idx=" + idx + "]", e); + throw new StorageException("Failed to close WAL write handle [idx=" + getSegmentId() + "]", e); } if (log.isDebugEnabled()) - log.debug("Closed WAL write handle [idx=" + idx + "]"); + log.debug("Closed WAL write handle [idx=" + getSegmentId() + "]"); return true; } @@ -2989,7 +2847,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl try { assert cctx.kernalContext().invalid() || written == lastFsyncPos || mode != WALMode.FSYNC : - "fsync [written=" + written + ", lastFsync=" + lastFsyncPos + ", idx=" + idx + ']'; + "fsync [written=" + written + ", lastFsync=" + lastFsyncPos + ", idx=" + getSegmentId() + ']'; fileIO = null; @@ -3039,12 +2897,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private static class RecordsIterator extends AbstractWalRecordsIterator { /** */ private static final long serialVersionUID = 0L; - /** */ - private final File walWorkDir; /** */ private final File walArchiveDir; + /** */ + private final File walWorkDir; + /** See {@link FileWriteAheadLogManager#archiver}. */ @Nullable private final FileArchiver archiver; @@ -3062,23 +2921,31 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl @Nullable private FileWALPointer end; + /** Manager of segment location. */ + private SegmentRouter segmentRouter; + + /** Holder of actual information of latest manipulation on WAL segments. */ + private SegmentAware segmentAware; + /** * @param cctx Shared context. - * @param walWorkDir WAL work dir. * @param walArchiveDir WAL archive dir. + * @param walWorkDir WAL dir. * @param start Optional start pointer. * @param end Optional end pointer. * @param dsCfg Database configuration. * @param serializerFactory Serializer factory. * @param archiver File Archiver. * @param decompressor Decompressor. - * @param log Logger - * @throws IgniteCheckedException If failed to initialize WAL segment. + * @param log Logger @throws IgniteCheckedException If failed to initialize WAL segment. + * @param segmentAware Segment aware. + * @param segmentRouter Segment router. + * @param segmentFileInputFactory */ private RecordsIterator( GridCacheSharedContext cctx, - File walWorkDir, File walArchiveDir, + File walWorkDir, @Nullable FileWALPointer start, @Nullable FileWALPointer end, DataStorageConfiguration dsCfg, @@ -3086,20 +2953,28 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl FileIOFactory ioFactory, @Nullable FileArchiver archiver, FileDecompressor decompressor, - IgniteLogger log + IgniteLogger log, + SegmentAware segmentAware, + SegmentRouter segmentRouter, + SegmentFileInputFactory segmentFileInputFactory ) throws IgniteCheckedException { super(log, cctx, serializerFactory, ioFactory, - dsCfg.getWalRecordIteratorBufferSize()); - this.walWorkDir = walWorkDir; + dsCfg.getWalRecordIteratorBufferSize(), + segmentFileInputFactory + ); this.walArchiveDir = walArchiveDir; - this.dsCfg = dsCfg; + this.walWorkDir = walWorkDir; this.archiver = archiver; this.start = start; this.end = end; + this.dsCfg = dsCfg; + this.decompressor = decompressor; + this.segmentRouter = segmentRouter; + this.segmentAware = segmentAware; init(); @@ -3138,10 +3013,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl curRec = null; - final AbstractReadFileHandle handle = closeCurrentWalSegment(); - - if (handle != null && handle.workDir()) - releaseWorkSegment(curWalSegmIdx); + closeCurrentWalSegment(); curWalSegmIdx = Integer.MAX_VALUE; } @@ -3196,42 +3068,24 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl @Override protected AbstractReadFileHandle advanceSegment( @Nullable final AbstractReadFileHandle curWalSegment ) throws IgniteCheckedException { - if (curWalSegment != null) { + if (curWalSegment != null) curWalSegment.close(); - if (curWalSegment.workDir()) - releaseWorkSegment(curWalSegment.idx()); - - } - // We are past the end marker. if (end != null && curWalSegmIdx + 1 > end.index()) return null; //stop iteration curWalSegmIdx++; - FileDescriptor fd; - - boolean readArchive = canReadArchiveOrReserveWork(curWalSegmIdx); - - if (archiver == null || readArchive) { - fd = new FileDescriptor(new File(walArchiveDir, - FileDescriptor.fileName(curWalSegmIdx))); - } - else { - long workIdx = curWalSegmIdx % dsCfg.getWalSegments(); - - fd = new FileDescriptor( - new File(walWorkDir, FileDescriptor.fileName(workIdx)), - curWalSegmIdx); - } - - if (log.isDebugEnabled()) - log.debug("Reading next file [absIdx=" + curWalSegmIdx + ", file=" + fd.file.getAbsolutePath() + ']'); + boolean readArchive = canReadArchiveOrReserveWork(curWalSegmIdx); //lock during creation handle. ReadFileHandle nextHandle; - try { + FileDescriptor fd = segmentRouter.findSegment(curWalSegmIdx); + + if (log.isDebugEnabled()) + log.debug("Reading next file [absIdx=" + curWalSegmIdx + ", file=" + fd.file.getAbsolutePath() + ']'); + nextHandle = initReadHandle(fd, start != null && curWalSegmIdx == start.index() ? start : null); } catch (FileNotFoundException e) { @@ -3241,12 +3095,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl nextHandle = null; } - if (nextHandle == null) { - if (!readArchive) - releaseWorkSegment(curWalSegmIdx); - } - else - nextHandle.workDir = !readArchive; + if (!readArchive) + releaseWorkSegment(curWalSegmIdx); curRec = null; @@ -3316,9 +3166,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** {@inheritDoc} */ - @Override protected AbstractReadFileHandle createReadFileHandle(FileIO fileIO, long idx, + @Override protected AbstractReadFileHandle createReadFileHandle(SegmentIO fileIO, RecordSerializer ser, FileInput in) { - return new ReadFileHandle(fileIO, idx, ser, in); + return new ReadFileHandle(fileIO, ser, in, segmentAware); } }