IGNITE-5558 - Added ability to read WAL in standalone mode - Fixes #2174. Signed-off-by: Alexey Goncharuk <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/44f3fac2 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/44f3fac2 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/44f3fac2 Branch: refs/heads/ignite-2.1.2-exchange Commit: 44f3fac27bec89b5e70e87564c527e48565ddd2a Parents: ee7566b Author: dpavlov <[email protected]> Authored: Tue Jul 4 20:23:40 2017 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Tue Jul 4 20:23:40 2017 +0300 ---------------------------------------------------------------------- .../PersistentStoreConfiguration.java | 39 +- .../org/apache/ignite/events/EventType.java | 12 + .../ignite/events/WalSegmentArchivedEvent.java | 62 ++ .../internal/pagemem/wal/record/WALRecord.java | 11 +- .../IgniteCacheDatabaseSharedManager.java | 10 +- .../wal/AbstractWalRecordsIterator.java | 289 +++++++++ .../cache/persistence/wal/FileInput.java | 16 +- .../cache/persistence/wal/FileWALPointer.java | 4 +- .../wal/FileWriteAheadLogManager.java | 586 +++++++++---------- .../cache/persistence/wal/RecordSerializer.java | 5 + .../persistence/wal/SegmentArchiveResult.java | 61 ++ .../persistence/wal/SegmentEofException.java | 3 +- .../wal/reader/IgniteWalIteratorFactory.java | 102 ++++ .../wal/reader/StandaloneGridKernalContext.java | 499 ++++++++++++++++ ...ndaloneIgniteCacheDatabaseSharedManager.java | 30 + .../reader/StandaloneWalRecordsIterator.java | 258 ++++++++ .../wal/serializer/RecordV1Serializer.java | 45 +- ...IgnitePersistentStoreDataStructuresTest.java | 2 + .../wal/IgniteWalHistoryReservationsTest.java | 2 +- .../db/wal/reader/IgniteWalReaderTest.java | 385 ++++++++++++ .../db/wal/reader/MockWalIteratorFactory.java | 114 ++++ .../ignite/testsuites/IgnitePdsTestSuite2.java | 9 +- 22 files changed, 2194 insertions(+), 350 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java index 1d41d41..b531f9d 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/PersistentStoreConfiguration.java @@ -55,7 +55,7 @@ public class PersistentStoreConfiguration implements Serializable { /** */ public static final int DFLT_WAL_SEGMENTS = 10; - /** */ + /** Default WAL file segment size, 64MBytes */ public static final int DFLT_WAL_SEGMENT_SIZE = 64 * 1024 * 1024; /** Default wal mode. */ @@ -103,10 +103,10 @@ public class PersistentStoreConfiguration implements Serializable { /** Number of work WAL segments. */ private int walSegments = DFLT_WAL_SEGMENTS; - /** Number of WAL segments to keep. */ + /** Size of one WAL segment in bytes. 64 Mb is used by default. Maximum value is 2Gb */ private int walSegmentSize = DFLT_WAL_SEGMENT_SIZE; - /** WAL persistence path. */ + /** Directory where WAL is stored (work directory) */ private String walStorePath = DFLT_WAL_STORE_PATH; /** WAL archive path. */ @@ -121,7 +121,7 @@ public class PersistentStoreConfiguration implements Serializable { /** WAl thread local buffer size. */ private int tlbSize = DFLT_TLB_SIZE; - /** Wal flush frequency. */ + /** Wal flush frequency in milliseconds. */ private int walFlushFreq = DFLT_WAL_FLUSH_FREQ; /** Wal fsync delay. */ @@ -147,6 +147,11 @@ public class PersistentStoreConfiguration implements Serializable { private long rateTimeInterval = DFLT_RATE_TIME_INTERVAL_MILLIS; /** + * Time interval (in milliseconds) for running auto archiving for incompletely WAL segment + */ + private long walAutoArchiveAfterInactivity = -1; + + /** * Returns a path the root directory where the Persistent Store will persist data and indexes. */ public String getPersistentStorePath() { @@ -297,7 +302,7 @@ public class PersistentStoreConfiguration implements Serializable { } /** - * Gets size of a WAL segment. + * Gets size of a WAL segment in bytes. * * @return WAL segment size. */ @@ -308,7 +313,7 @@ public class PersistentStoreConfiguration implements Serializable { /** * Sets size of a WAL segment. * - * @param walSegmentSize WAL segment size. 64 MB is used by default. + * @param walSegmentSize WAL segment size. 64 MB is used by default. Maximum value is 2Gb * @return {@code this} for chaining. */ public PersistentStoreConfiguration setWalSegmentSize(int walSegmentSize) { @@ -533,6 +538,28 @@ public class PersistentStoreConfiguration implements Serializable { return this; } + /** + * <b>Note:</b> setting this value with {@link WALMode#DEFAULT} may generate file size overhead for WAL segments in case + * grid is used rarely. + * + * @param walAutoArchiveAfterInactivity time in millis to run auto archiving segment (even if incomplete) after last + * record logging. <br> Positive value enables incomplete segment archiving after timeout (inactivity). <br> Zero or + * negative value disables auto archiving. + * @return current configuration instance for chaining + */ + public PersistentStoreConfiguration setWalAutoArchiveAfterInactivity(long walAutoArchiveAfterInactivity) { + this.walAutoArchiveAfterInactivity = walAutoArchiveAfterInactivity; + + return this; + } + + /** + * @return time in millis to run auto archiving WAL segment (even if incomplete) after last record log + */ + public long getWalAutoArchiveAfterInactivity() { + return walAutoArchiveAfterInactivity; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(PersistentStoreConfiguration.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/events/EventType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/events/EventType.java b/modules/core/src/main/java/org/apache/ignite/events/EventType.java index 1960692..47b4089 100644 --- a/modules/core/src/main/java/org/apache/ignite/events/EventType.java +++ b/modules/core/src/main/java/org/apache/ignite/events/EventType.java @@ -767,6 +767,18 @@ public interface EventType { public static final int EVT_IGFS_FILE_PURGED = 127; /** + * Built-in event type: WAL segment movement to archive folder completed + * <p> + * Fired for each completed WAL segment which was moved to archive + * <p> + * NOTE: all types in range <b>from 1 to 1000 are reserved</b> for + * internal Ignite events and should not be used by user-defined events. + * + * @see WalSegmentArchivedEvent + */ + public static final int EVT_WAL_SEGMENT_ARCHIVED = 128; + + /** * All checkpoint events. This array can be directly passed into * {@link IgniteEvents#localListen(IgnitePredicate, int...)} method to * subscribe to all checkpoint events. http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/events/WalSegmentArchivedEvent.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/events/WalSegmentArchivedEvent.java b/modules/core/src/main/java/org/apache/ignite/events/WalSegmentArchivedEvent.java new file mode 100644 index 0000000..2fc1715 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/events/WalSegmentArchivedEvent.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.events; + +import java.io.File; +import org.apache.ignite.cluster.ClusterNode; +import org.jetbrains.annotations.NotNull; + +/** + * Event indicates there was movement of WAL segment file to archive has been completed + */ +public class WalSegmentArchivedEvent extends EventAdapter { + /** */ + private static final long serialVersionUID = 0L; + + /** Absolute WAL segment file index. */ + private long absWalSegmentIdx; + + /** Destination archive file. This file is completed and closed archive segment */ + private final File archiveFile; + + /** + * Creates WAL segment event + * + * @param node Node. + * @param absWalSegmentIdx Absolute wal segment index. + * @param archiveFile Archive file. + */ + public WalSegmentArchivedEvent( + @NotNull final ClusterNode node, + final long absWalSegmentIdx, + final File archiveFile) { + super(node, "", EventType.EVT_WAL_SEGMENT_ARCHIVED); + this.absWalSegmentIdx = absWalSegmentIdx; + this.archiveFile = archiveFile; + } + + /** @return {@link #archiveFile} */ + public File getArchiveFile() { + return archiveFile; + } + + /** @return {@link #absWalSegmentIdx} */ + public long getAbsWalSegmentIdx() { + return absWalSegmentIdx; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java index 678e1fa..89f3c86 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/WALRecord.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.pagemem.wal.record; +import org.apache.ignite.configuration.WALMode; import org.apache.ignite.internal.pagemem.wal.WALPointer; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.internal.S; @@ -26,7 +27,8 @@ import org.apache.ignite.internal.util.typedef.internal.S; */ public abstract class WALRecord { /** - * Record type. + * Record type. Ordinal of this record will be written to file. <br> + * <b>Note:</b> Do not change order of elements <br> */ public enum RecordType { /** */ @@ -171,6 +173,13 @@ public abstract class WALRecord { public static RecordType fromOrdinal(int ord) { return ord < 0 || ord >= VALS.length ? null : VALS[ord]; } + + /** + * Fake record type, causes stop iterating and indicates segment EOF + * <b>Note:</b> regular record type is incremented by 1 and minimal value written to file is also 1 + * For {@link WALMode#DEFAULT} this value is at least came from padding + */ + public static final int STOP_ITERATION_RECORD_TYPE = 0; } /** */ http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java index ec0e895..f04c278 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/IgniteCacheDatabaseSharedManager.java @@ -92,7 +92,7 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap /** */ private FreeListImpl dfltFreeList; - /** */ + /** Page size from memory configuration, may be set only for fake(standalone) IgniteCacheDataBaseSharedManager */ private int pageSize; /** {@inheritDoc} */ @@ -961,4 +961,12 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap public String systemMemoryPolicyName() { return SYSTEM_MEMORY_POLICY_NAME; } + + /** + * Method for fake (standalone) context initialization. Not to be called in production code + * @param pageSize configured page size + */ + protected void setPageSize(int pageSize) { + this.pageSize = pageSize; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java new file mode 100644 index 0000000..7dc0a28 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal; + +import java.io.EOFException; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.FileChannel; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.internal.pagemem.wal.WALIterator; +import org.apache.ignite.internal.pagemem.wal.WALPointer; +import org.apache.ignite.internal.pagemem.wal.record.WALRecord; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord; +import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; +import org.apache.ignite.lang.IgniteBiTuple; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * Iterator over WAL segments. This abstract class provides most functionality for reading records in log. + * Subclasses are to override segment switching functionality + */ +public abstract class AbstractWalRecordsIterator extends GridCloseableIteratorAdapter<IgniteBiTuple<WALPointer, WALRecord>> + implements WALIterator { + /** */ + private static final long serialVersionUID = 0L; + + /** + * Current record preloaded, to be returned on next()<br> + * Normally this should be not null because advance() method should already prepare some value<br> + */ + protected IgniteBiTuple<WALPointer, WALRecord> curRec; + + /** + * Current WAL segment absolute index. <br> + * Determined as lowest number of file at start, is changed during advance segment + */ + protected long curWalSegmIdx = -1; + + /** + * Current WAL segment read file handle. To be filled by subclass advanceSegment + */ + private FileWriteAheadLogManager.ReadFileHandle currWalSegment; + + /** Logger */ + @NotNull protected final IgniteLogger log; + + /** Shared context for creating serializer of required version and grid name access */ + @NotNull private final GridCacheSharedContext sharedCtx; + + /** Serializer of current version to read headers. */ + @NotNull private final RecordSerializer serializer; + + /** Utility buffer for reading records */ + private final ByteBuffer buf; + + /** + * @param log Logger + * @param sharedCtx Shared context + * @param serializer Serializer of current version to read headers. + * @param bufSize buffer for reading records size + */ + protected AbstractWalRecordsIterator( + @NotNull final IgniteLogger log, + @NotNull final GridCacheSharedContext sharedCtx, + @NotNull final RecordSerializer serializer, + final int bufSize) { + this.log = log; + this.sharedCtx = sharedCtx; + this.serializer = serializer; + + // Do not allocate direct buffer for iterator. + buf = ByteBuffer.allocate(bufSize); + buf.order(ByteOrder.nativeOrder()); + + } + + /** + * Scans provided folder for a WAL segment files + * @param walFilesDir directory to scan + * @return found WAL file descriptors + */ + protected static FileWriteAheadLogManager.FileDescriptor[] loadFileDescriptors(@NotNull final File walFilesDir) throws IgniteCheckedException { + final File[] files = walFilesDir.listFiles(FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER); + + if (files == null) { + throw new IgniteCheckedException("WAL files directory does not not denote a " + + "directory, or if an I/O error occurs: [" + walFilesDir.getAbsolutePath() + "]"); + } + return FileWriteAheadLogManager.scan(files); + } + + /** {@inheritDoc} */ + @Override protected IgniteBiTuple<WALPointer, WALRecord> onNext() throws IgniteCheckedException { + IgniteBiTuple<WALPointer, WALRecord> ret = curRec; + + advance(); + + return ret; + } + + /** {@inheritDoc} */ + @Override protected boolean onHasNext() throws IgniteCheckedException { + return curRec != null; + } + + /** + * Switches records iterator to the next record. + * <ul> + * <li>{@link #curRec} will be updated.</li> + * <li> If end of segment reached, switch to new segment is called. {@link #currWalSegment} will be updated.</li> + * </ul> + * + * {@code advance()} runs a step ahead {@link #next()} + * + * @throws IgniteCheckedException If failed. + */ + protected void advance() throws IgniteCheckedException { + while (true) { + curRec = advanceRecord(currWalSegment); + + if (curRec != null) + return; + else { + currWalSegment = advanceSegment(currWalSegment); + + if (currWalSegment == null) + return; + } + } + } + + /** + * Closes and returns WAL segment (if any) + * @return closed handle + * @throws IgniteCheckedException if IO failed + */ + @Nullable protected FileWriteAheadLogManager.ReadFileHandle closeCurrentWalSegment() throws IgniteCheckedException { + final FileWriteAheadLogManager.ReadFileHandle walSegmentClosed = currWalSegment; + + if (walSegmentClosed != null) { + walSegmentClosed.close(); + currWalSegment = null; + } + return walSegmentClosed; + } + + /** + * Switches records iterator to the next WAL segment + * as result of this method, new reference to segment should be returned. + * Null for current handle means stop of iteration + * @throws IgniteCheckedException if reading failed + * @param curWalSegment current open WAL segment or null if there is no open segment yet + * @return new WAL segment to read or null for stop iteration + */ + protected abstract FileWriteAheadLogManager.ReadFileHandle advanceSegment( + @Nullable final FileWriteAheadLogManager.ReadFileHandle curWalSegment) throws IgniteCheckedException; + + /** + * Switches to new record + * @param hnd currently opened read handle + * @return next advanced record + */ + private IgniteBiTuple<WALPointer, WALRecord> advanceRecord( + @Nullable final FileWriteAheadLogManager.ReadFileHandle hnd) { + if (hnd == null) + return null; + + final FileWALPointer ptr = new FileWALPointer( + hnd.idx, + (int)hnd.in.position(), + 0); + + try { + final WALRecord rec = hnd.ser.readRecord(hnd.in, ptr); + + ptr.length(rec.size()); + + // cast using diamond operator here can break compile for 7 + return new IgniteBiTuple<>((WALPointer)ptr, rec); + } + catch (IOException | IgniteCheckedException e) { + if (!(e instanceof SegmentEofException)) + handleRecordException(e, ptr); + return null; + } + } + + /** + * Handler for record deserialization exception + * @param e problem from records reading + * @param ptr file pointer was accessed + */ + protected void handleRecordException( + @NotNull final Exception e, + @Nullable final FileWALPointer ptr) { + if (log.isInfoEnabled()) + log.info("Stopping WAL iteration due to an exception: " + e.getMessage()); + } + + /** + * @param desc File descriptor. + * @param start Optional start pointer. Null means read from the beginning + * @return Initialized file handle. + * @throws FileNotFoundException If segment file is missing. + * @throws IgniteCheckedException If initialized failed due to another unexpected error. + */ + protected FileWriteAheadLogManager.ReadFileHandle initReadHandle( + @NotNull final FileWriteAheadLogManager.FileDescriptor desc, + @Nullable final FileWALPointer start) + throws IgniteCheckedException, FileNotFoundException { + try { + RandomAccessFile rf = new RandomAccessFile(desc.file, "r"); + + try { + FileChannel ch = rf.getChannel(); + FileInput in = new FileInput(ch, buf); + + // Header record must be agnostic to the serializer version. + WALRecord rec = serializer.readRecord(in, + new FileWALPointer(desc.idx, (int)ch.position(), 0)); + + if (rec == null) + return null; + + if (rec.type() != WALRecord.RecordType.HEADER_RECORD) + throw new IOException("Missing file header record: " + desc.file.getAbsoluteFile()); + + int ver = ((HeaderRecord)rec).version(); + + RecordSerializer ser = FileWriteAheadLogManager.forVersion(sharedCtx, ver); + + if (start != null && desc.idx == start.index()) + in.seek(start.fileOffset()); + + return new FileWriteAheadLogManager.ReadFileHandle(rf, desc.idx, sharedCtx.igniteInstanceName(), ser, in); + } + catch (SegmentEofException | EOFException ignore) { + try { + rf.close(); + } + catch (IOException ce) { + throw new IgniteCheckedException(ce); + } + + return null; + } + catch (IOException | IgniteCheckedException e) { + try { + rf.close(); + } + catch (IOException ce) { + e.addSuppressed(ce); + } + + throw e; + } + } + catch (FileNotFoundException e) { + throw e; + } + catch (IOException e) { + throw new IgniteCheckedException( + "Failed to initialize WAL segment: " + desc.file.getAbsolutePath(), e); + } + } + +} http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java index be1e477..e2d7cba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileInput.java @@ -26,21 +26,25 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaC import org.jetbrains.annotations.NotNull; /** - * File input. + * File input, backed by byte buffer file input. + * This class allows to read data by chunks from file and then read primitives */ public final class FileInput implements ByteBufferBackedDataInput { - /** */ + /** + * Buffer for reading blocks of data into. + * <b>Note:</b> biggest block requested from this input can't be longer than buffer capacity + */ private ByteBuffer buf; - /** */ + /** File channel to read chunks from */ private FileChannel ch; /** */ private long pos; /** - * @param ch Channel. - * @param buf Buffer. + * @param ch Channel to read from + * @param buf Buffer for reading blocks of data into */ public FileInput(FileChannel ch, ByteBuffer buf) throws IOException { assert ch != null; @@ -101,7 +105,7 @@ public final class FileInput implements ByteBufferBackedDataInput { int read = ch.read(buf); if (read == -1) - throw new EOFException(); + throw new EOFException("EOF at position [" + ch.position() + "] expected to read [" + requested + "] bytes"); available += read; http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWALPointer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWALPointer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWALPointer.java index b6ddfb8..3716de2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWALPointer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWALPointer.java @@ -46,7 +46,7 @@ public class FileWALPointer implements WALPointer, Comparable<FileWALPointer> { } /** - * @param idx File timestamp index. + * @param idx Absolute WAL segment file index . * @param fileOffset Offset in file, from the beginning. * @param len Record length. * @param forceFlush Force flush flag. @@ -59,7 +59,7 @@ public class FileWALPointer implements WALPointer, Comparable<FileWALPointer> { } /** - * @return Timestamp index. + * @return Absolute WAL segment file index . */ public long index() { return idx; http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index 5918141..f877a14 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -27,6 +27,7 @@ import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.channels.FileChannel; import java.nio.file.Files; +import java.sql.Time; import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -34,11 +35,11 @@ import java.util.NavigableMap; import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; import org.apache.ignite.IgniteCheckedException; @@ -46,8 +47,10 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.PersistentStoreConfiguration; import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.events.EventType; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; import org.apache.ignite.internal.pagemem.wal.StorageException; import org.apache.ignite.internal.pagemem.wal.WALIterator; @@ -58,9 +61,11 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.PersistenceMetricsImpl; +import org.apache.ignite.events.WalSegmentArchivedEvent; import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord; import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer; -import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; +import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; +import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; @@ -69,6 +74,7 @@ import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -85,14 +91,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** */ private static final byte[] FILL_BUF = new byte[1024 * 1024]; - /** */ + /** Pattern for segment file names */ private static final Pattern WAL_NAME_PATTERN = Pattern.compile("\\d{16}\\.wal"); /** */ private static final Pattern WAL_TEMP_NAME_PATTERN = Pattern.compile("\\d{16}\\.wal\\.tmp"); - /** */ - private static final FileFilter WAL_SEGMENT_FILE_FILTER = new FileFilter() { + /** WAL segment file filter, see {@link #WAL_NAME_PATTERN} */ + public static final FileFilter WAL_SEGMENT_FILE_FILTER = new FileFilter() { @Override public boolean accept(File file) { return !file.isDirectory() && WAL_NAME_PATTERN.matcher(file.getName()).matches(); } @@ -118,7 +124,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private final int tlbSize; /** WAL flush frequency. Makes sense only for {@link WALMode#BACKGROUND} log WALMode. */ - public final int flushFreq; + private final int flushFreq; /** Fsync delay. */ private final long fsyncDelay; @@ -126,6 +132,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** */ private final PersistentStoreConfiguration psCfg; + /** Events service */ + private final GridEventStorageManager evt; + /** */ private IgniteConfiguration igCfg; @@ -135,10 +144,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** */ private File walWorkDir; - /** */ + /** WAL archive directory (including consistent ID as subfolder) */ private File walArchiveDir; - /** */ + /** Serializer of current version, used to read header record and for write records */ private RecordSerializer serializer; /** */ @@ -167,18 +176,41 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private volatile FileArchiver archiver; /** */ - private QueueFlusher flusher; - - /** */ private final ThreadLocal<WALPointer> lastWALPtr = new ThreadLocal<>(); /** Current log segment handle */ private volatile FileWriteHandle currentHnd; /** + * Positive (non-0) value indicates WAL can be archived even if not complete<br> + * See {@link PersistentStoreConfiguration#setWalAutoArchiveAfterInactivity(long)}<br> + */ + private final long walAutoArchiveAfterInactivity; + + /** + * Container with last WAL record logged timestamp.<br> + * Zero value means there was no records logged to current segment, skip possible archiving for this case<br> + * Value is filled only for case {@link #walAutoArchiveAfterInactivity} > 0<br> + */ + private AtomicLong lastRecordLoggedMs = new AtomicLong(); + + /** + * Cancellable task for {@link WALMode#BACKGROUND}, should be cancelled at shutdown + * Null for non background modes + */ + @Nullable private volatile GridTimeoutProcessor.CancelableTask backgroundFlushSchedule; + + /** + * Reference to the last added next archive timeout check object. + * Null if mode is not enabled. + * Should be cancelled at shutdown + */ + @Nullable private volatile GridTimeoutObject nextAutoArchiveTimeoutObj; + + /** * @param ctx Kernal context. */ - public FileWriteAheadLogManager(GridKernalContext ctx) { + public FileWriteAheadLogManager(@NotNull final GridKernalContext ctx) { igCfg = ctx.config(); PersistentStoreConfiguration psCfg = igCfg.getPersistentStoreConfiguration(); @@ -193,6 +225,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl flushFreq = psCfg.getWalFlushFrequency(); fsyncDelay = psCfg.getWalFsyncDelay(); alwaysWriteFullPages = psCfg.isAlwaysWriteFullPages(); + walAutoArchiveAfterInactivity = psCfg.getWalAutoArchiveAfterInactivity(); + evt = ctx.event(); } /** {@inheritDoc} */ @@ -248,8 +282,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (psCfg.getWalStorePath() == null ^ psCfg.getWalArchivePath() == null) { throw new IgniteCheckedException( "Properties should be either both specified or both null " + - "[walStorePath = " + psCfg.getWalStorePath() + - ", walArchivePath = " + psCfg.getWalArchivePath() + "]" + "[walStorePath = " + psCfg.getWalStorePath() + + ", walArchivePath = " + psCfg.getWalArchivePath() + "]" ); } } @@ -271,26 +305,32 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** {@inheritDoc} */ @Override protected void stop0(boolean cancel) { - FileWriteHandle currentHnd = currentHandle(); + final GridTimeoutProcessor.CancelableTask schedule = backgroundFlushSchedule; - try { - QueueFlusher flusher0 = flusher; + if (schedule != null) + schedule.close(); - if (flusher0 != null) { - flusher0.shutdown(); + final GridTimeoutObject timeoutObj = nextAutoArchiveTimeoutObj; - if (currentHnd != null) - currentHnd.flush((FileWALPointer)null); + if (timeoutObj != null) + cctx.time().removeTimeoutObject(timeoutObj); + + final FileWriteHandle currHnd = currentHandle(); + + try { + if (mode == WALMode.BACKGROUND) { + if (currHnd != null) + currHnd.flush((FileWALPointer)null); } - if (currentHnd != null) - currentHnd.close(false); + if (currHnd != null) + currHnd.close(false); if (archiver != null) archiver.shutdown(); } catch (Exception e) { - U.error(log, "Failed to gracefully close WAL segment: " + currentHnd.file, e); + U.error(log, "Failed to gracefully close WAL segment: " + currHnd.file, e); } } @@ -350,39 +390,114 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } if (mode == WALMode.BACKGROUND) { - flusher = new QueueFlusher(cctx.igniteInstanceName()); - - flusher.start(); + backgroundFlushSchedule = cctx.time().schedule(new Runnable() { + @Override public void run() { + doFlush(); + } + }, flushFreq, flushFreq); } + + if (walAutoArchiveAfterInactivity > 0) + scheduleNextInactivityPeriodElapsedCheck(); } catch (StorageException e) { throw new IgniteCheckedException(e); } } + /** + * Schedules next check of inactivity period expired. Based on current record update timestamp. + * At timeout method does check of inactivity period and schedules new launch. + */ + private void scheduleNextInactivityPeriodElapsedCheck() { + final long lastRecMs = lastRecordLoggedMs.get(); + final long nextPossibleAutoArchive = (lastRecMs <= 0 ? U.currentTimeMillis() : lastRecMs) + walAutoArchiveAfterInactivity; + + if (log.isDebugEnabled()) + log.debug("Schedule WAL rollover check at " + new Time(nextPossibleAutoArchive).toString()); + + nextAutoArchiveTimeoutObj = new GridTimeoutObject() { + private final IgniteUuid id = IgniteUuid.randomUuid(); + + @Override public IgniteUuid timeoutId() { + return id; + } + + @Override public long endTime() { + return nextPossibleAutoArchive; + } + + @Override public void onTimeout() { + if (log.isDebugEnabled()) + log.debug("Checking if WAL rollover required (" + new Time(U.currentTimeMillis()).toString() + ")"); + + checkWalRolloverRequiredDuringInactivityPeriod(); + + scheduleNextInactivityPeriodElapsedCheck(); + } + }; + cctx.time().addTimeoutObject(nextAutoArchiveTimeoutObj); + } + + /** + * Checks if there was elapsed significant period of inactivity. + * If WAL auto-archive is enabled using {@link #walAutoArchiveAfterInactivity} > 0 this method will activate + * roll over by timeout<br> + */ + private void checkWalRolloverRequiredDuringInactivityPeriod() { + if (walAutoArchiveAfterInactivity <= 0) + return; // feature not configured, nothing to do + + final long lastRecMs = lastRecordLoggedMs.get(); + + if (lastRecMs == 0) + return; //no records were logged to current segment, does not consider inactivity + + final long elapsedMs = U.currentTimeMillis() - lastRecMs; + + if (elapsedMs <= walAutoArchiveAfterInactivity) + return; // not enough time elapsed since last write + + if (!lastRecordLoggedMs.compareAndSet(lastRecMs, 0)) + return; // record write occurred concurrently + + final FileWriteHandle handle = currentHandle(); + + try { + rollOver(handle); + } + catch (IgniteCheckedException e) { + U.error(log, "Unable to perform segment rollover: " + e.getMessage(), e); + handle.invalidateEnvironment(e); + } + } + /** {@inheritDoc} */ @SuppressWarnings("TooBroadScope") @Override public WALPointer log(WALRecord record) throws IgniteCheckedException, StorageException { if (serializer == null || mode == WALMode.NONE) return null; - FileWriteHandle current = currentHandle(); + FileWriteHandle currWrHandle = currentHandle(); // Logging was not resumed yet. - if (current == null) + if (currWrHandle == null) return null; // Need to calculate record size first. record.size(serializer.size(record)); - for (; ; current = rollOver(current)) { - WALPointer ptr = current.addRecord(record); + for (; ; currWrHandle = rollOver(currWrHandle)) { + WALPointer ptr = currWrHandle.addRecord(record); if (ptr != null) { metrics.onWalRecordLogged(); lastWALPtr.set(ptr); + if (walAutoArchiveAfterInactivity > 0) + lastRecordLoggedMs.set(U.currentTimeMillis()); + return ptr; } @@ -665,6 +780,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl assert swapped : "Concurrent updates on rollover are not allowed"; + if (walAutoArchiveAfterInactivity > 0) + lastRecordLoggedMs.set(0); + // Let other threads to proceed with new segment. hnd.signalNextAvailable(); } @@ -888,7 +1006,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @param ver Serializer version. * @return Entry serializer. */ - private static RecordSerializer forVersion(GridCacheSharedContext cctx, int ver) throws IgniteCheckedException { + static RecordSerializer forVersion(GridCacheSharedContext cctx, int ver) throws IgniteCheckedException { if (ver <= 0) throw new IgniteCheckedException("Failed to create a serializer (corrupted WAL file)."); @@ -905,7 +1023,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** * @return Sorted WAL files descriptors. */ - private static FileDescriptor[] scan(File[] allFiles) { + public static FileDescriptor[] scan(File[] allFiles) { if (allFiles == null) return EMPTY_DESCRIPTORS; @@ -931,11 +1049,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * * Monitor of current object is used for notify on: * <ul> - * <li>exception occurred ({@link FileArchiver#cleanException}!=null)</li> - * <li>stopping thread ({@link FileArchiver#stopped}==true)</li> - * <li>current file index changed ({@link FileArchiver#curAbsWalIdx})</li> - * <li>last archived file index was changed ({@link FileArchiver#lastAbsArchivedIdx})</li> - * <li>some WAL index was removed from {@link FileArchiver#locked} map</li> + * <li>exception occurred ({@link FileArchiver#cleanException}!=null)</li> + * <li>stopping thread ({@link FileArchiver#stopped}==true)</li> + * <li>current file index changed ({@link FileArchiver#curAbsWalIdx})</li> + * <li>last archived file index was changed ({@link FileArchiver#lastAbsArchivedIdx})</li> + * <li>some WAL index was removed from {@link FileArchiver#locked} map</li> * </ul> */ private class FileArchiver extends Thread { @@ -1017,6 +1135,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** * Check if WAL segment locked or reserved + * * @param absIdx Index for check reservation. * @return {@code True} if index is reserved. */ @@ -1080,7 +1199,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl break; try { - File workFile = archiveSegment(toArchive); + final SegmentArchiveResult res = archiveSegment(toArchive); synchronized (this) { while (locked.containsKey(toArchive) && !stopped) @@ -1088,13 +1207,16 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl // Firstly, format working file if (!stopped) - formatFile(workFile); + formatFile(res.getOrigWorkFile()); // Then increase counter to allow rollover on clean working file lastAbsArchivedIdx = toArchive; notifyAll(); } + if (evt.isRecordable(EventType.EVT_WAL_SEGMENT_ARCHIVED)) + evt.record(new WalSegmentArchivedEvent(cctx.discovery().localNode(), + res.getAbsIdx(), res.getDstArchiveFile())); } catch (IgniteCheckedException e) { synchronized (this) { @@ -1115,7 +1237,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * Blocks till there are available file to write * * @param curIdx Current absolute index that we want to increment. - * @return Next index (curIdx+1) when it is ready to be written. + * @return Next index (curWalSegmIdx+1) when it is ready to be written. * @throws IgniteCheckedException If failed (if interrupted or if exception occurred in the archiver thread). */ private long nextAbsoluteSegmentIndex(long curIdx) throws IgniteCheckedException { @@ -1195,9 +1317,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** + * Moves WAL segment from work folder to archive folder. + * Temp file is used to do movement + * * @param absIdx Absolute index to archive. */ - private File archiveSegment(long absIdx) throws IgniteCheckedException { + private SegmentArchiveResult archiveSegment(long absIdx) throws IgniteCheckedException { long segIdx = absIdx % psCfg.getWalSegments(); File origFile = new File(walWorkDir, FileDescriptor.fileName(segIdx)); @@ -1235,7 +1360,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl log.debug("Copied file [src=" + origFile.getAbsolutePath() + ", dst=" + dstFile.getAbsolutePath() + ']'); - return origFile; + return new SegmentArchiveResult(absIdx, origFile, dstFile); } /** @@ -1316,7 +1441,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** * WAL file descriptor. */ - private static class FileDescriptor implements Comparable<FileDescriptor> { + public static class FileDescriptor implements Comparable<FileDescriptor> { /** */ protected final File file; @@ -1324,9 +1449,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl protected final long idx; /** - * @param file File. + * Creates file descriptor. Index is restored from file name + * + * @param file WAL segment file. */ - private FileDescriptor(File file) { + public FileDescriptor(@NotNull File file) { this(file, null); } @@ -1334,7 +1461,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @param file WAL segment file. * @param idx Absolute WAL segment file index. For null value index is restored from file name */ - private FileDescriptor(@NotNull File file, @Nullable Long idx) { + public FileDescriptor(@NotNull File file, @Nullable Long idx) { this.file = file; String fileName = file.getName(); @@ -1350,7 +1477,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @param segment Segment index. * @return Segment file name. */ - private static String fileName(long segment) { + public static String fileName(long segment) { SB b = new SB(); String segmentStr = Long.toString(segment); @@ -1402,6 +1529,20 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl @Override public int hashCode() { return (int)(idx ^ (idx >>> 32)); } + + /** + * @return Absolute WAL segment file index + */ + public long getIdx() { + return idx; + } + + /** + * @return absolute pathname string of this file descriptor pathname. + */ + public String getAbsolutePath() { + return file.getAbsolutePath(); + } } /** @@ -1438,14 +1579,17 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** * */ - private static class ReadFileHandle extends FileHandle { + public static class ReadFileHandle extends FileHandle { /** Entry serializer. */ - private RecordSerializer ser; + RecordSerializer ser; /** */ - private FileInput in; + FileInput in; - /** */ + /** + * <code>true</code> if this file handle came from work directory. + * <code>false</code> if this file handle came from archive directory. + */ private boolean workDir; /** @@ -1454,7 +1598,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @param ser Entry serializer. * @param in File input. */ - private ReadFileHandle( + ReadFileHandle( RandomAccessFile file, long idx, String gridName, @@ -1499,7 +1643,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl */ private final AtomicReference<WALRecord> head = new AtomicReference<>(); - /** Position in current file after the end of last written record (incremented after file channel write operation) */ + /** + * Position in current file after the end of last written record (incremented after file channel write + * operation) + */ private volatile long written; /** */ @@ -1508,7 +1655,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** Environment failure. */ private volatile Throwable envFailed; - /** Stop guard to provide warranty that only one thread will be successful in calling {@link #close(boolean)}*/ + /** Stop guard to provide warranty that only one thread will be successful in calling {@link #close(boolean)} */ private final AtomicBoolean stop = new AtomicBoolean(false); /** */ @@ -1754,6 +1901,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** * Serializes WAL records chain to provided byte buffer + * * @param buf Buffer, will be filled with records chain from end to beginning * @param head Head of the chain to write to the buffer. * @return Position in file for this buffer. @@ -1886,11 +2034,15 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl flushOrWait(null); try { - if (rollOver && written < (maxSegmentSize - 1)) { - ByteBuffer allocate = ByteBuffer.allocate(1); - allocate.put((byte) WALRecord.RecordType.SWITCH_SEGMENT_RECORD.ordinal()); - - ch.write(allocate, written); + int switchSegmentRecSize = RecordV1Serializer.REC_TYPE_SIZE + RecordV1Serializer.FILE_WAL_POINTER_SIZE; + if (rollOver && written < (maxSegmentSize - switchSegmentRecSize)) { + //it is expected there is sufficient space for this record because rollover should run early + final ByteBuffer buf = ByteBuffer.allocate(switchSegmentRecSize); + buf.put((byte)(WALRecord.RecordType.SWITCH_SEGMENT_RECORD.ordinal() + 1)); + final FileWALPointer pointer = new FileWALPointer(idx, (int)ch.position(), -1); + RecordV1Serializer.putPosition(buf, pointer); + buf.rewind(); + ch.write(buf, written); if (mode == WALMode.DEFAULT) ch.force(false); @@ -1951,8 +2103,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** - * @param pos Position in file to start write from. - * May be checked against actual position to wait previous writes to complete + * @param pos Position in file to start write from. May be checked against actual position to wait previous + * writes to complete * @param buf Buffer to write to file * @throws StorageException If failed. * @throws IgniteCheckedException If failed. @@ -2133,8 +2285,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** * Iterator over WAL-log. */ - private static class RecordsIterator extends GridCloseableIteratorAdapter<IgniteBiTuple<WALPointer, WALRecord>> - implements WALIterator { + private static class RecordsIterator extends AbstractWalRecordsIterator { /** */ private static final long serialVersionUID = 0L; /** */ @@ -2149,33 +2300,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** */ private final PersistentStoreConfiguration psCfg; - /** */ - private final RecordSerializer serializer; - - /** */ - private final GridCacheSharedContext cctx; - - /** */ + /** Optional start pointer. */ + @Nullable private FileWALPointer start; - /** */ + /** Optional end pointer. */ + @Nullable private FileWALPointer end; - /** */ - private IgniteBiTuple<WALPointer, WALRecord> curRec; - - /** */ - private long curIdx = -1; - - /** */ - private ReadFileHandle curHandle; - - /** */ - private ByteBuffer buf; - - /** */ - private IgniteLogger log; - /** * @param cctx Shared context. * @param walWorkDir WAL work dir. @@ -2183,37 +2315,33 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @param start Optional start pointer. * @param end Optional end pointer. * @param psCfg Database configuration. - * @param serializer Serializer. + * @param serializer Serializer of current version to read headers. * @param archiver Archiver. + * @param log Logger * @throws IgniteCheckedException If failed to initialize WAL segment. */ private RecordsIterator( GridCacheSharedContext cctx, File walWorkDir, File walArchiveDir, - FileWALPointer start, - FileWALPointer end, + @Nullable FileWALPointer start, + @Nullable FileWALPointer end, PersistentStoreConfiguration psCfg, - RecordSerializer serializer, + @NotNull RecordSerializer serializer, FileArchiver archiver, IgniteLogger log, int tlbSize ) throws IgniteCheckedException { - this.cctx = cctx; + super(log, + cctx, + serializer, + Math.min(16 * tlbSize, psCfg.getWalRecordIteratorBufferSize())); this.walWorkDir = walWorkDir; this.walArchiveDir = walArchiveDir; this.psCfg = psCfg; - this.serializer = serializer; this.archiver = archiver; this.start = start; this.end = end; - this.log = log; - - int buffSize = Math.min(16 * tlbSize, psCfg.getWalRecordIteratorBufferSize()); - - // Do not allocate direct buffer for iterator. - buf = ByteBuffer.allocate(buffSize); - buf.order(ByteOrder.nativeOrder()); init(); @@ -2221,40 +2349,21 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** {@inheritDoc} */ - @Override protected IgniteBiTuple<WALPointer, WALRecord> onNext() throws IgniteCheckedException { - IgniteBiTuple<WALPointer, WALRecord> ret = curRec; - - advance(); - - return ret; - } - - /** {@inheritDoc} */ - @Override protected boolean onHasNext() throws IgniteCheckedException { - return curRec != null; - } - - /** {@inheritDoc} */ @Override protected void onClose() throws IgniteCheckedException { curRec = null; - if (curHandle != null) { - curHandle.close(); + final ReadFileHandle handle = closeCurrentWalSegment(); + if (handle != null && handle.workDir) + releaseWorkSegment(curWalSegmIdx); - if (curHandle.workDir) - releaseWorkSegment(curIdx); - - curHandle = null; - } - - curIdx = Integer.MAX_VALUE; + curWalSegmIdx = Integer.MAX_VALUE; } /** * @throws IgniteCheckedException If failed to initialize first file handle. */ private void init() throws IgniteCheckedException { - FileDescriptor[] descs = scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_FILTER)); + FileDescriptor[] descs = loadFileDescriptors(walArchiveDir); if (start != null) { if (!F.isEmpty(descs)) { @@ -2264,13 +2373,13 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl for (FileDescriptor desc : descs) { if (desc.idx == start.index()) { - curIdx = start.index(); + curWalSegmIdx = start.index(); break; } } - if (curIdx == -1) { + if (curWalSegmIdx == -1) { long lastArchived = descs[descs.length - 1].idx; if (lastArchived > start.index()) @@ -2278,203 +2387,86 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl // This pointer may be in work files because archiver did not // copy the file yet, check that it is not too far forward. - curIdx = start.index(); + curWalSegmIdx = start.index(); } } else { // This means that whole checkpoint history fits in one segment in WAL work directory. // Will start from this index right away. - curIdx = start.index(); + curWalSegmIdx = start.index(); } } else - curIdx = !F.isEmpty(descs) ? descs[0].idx : 0; + curWalSegmIdx = !F.isEmpty(descs) ? descs[0].idx : 0; - curIdx--; + curWalSegmIdx--; if (log.isDebugEnabled()) - log.debug("Initialized WAL cursor [start=" + start + ", end=" + end + ", curIdx=" + curIdx + ']'); - } - - /** - * @throws IgniteCheckedException If failed. - */ - private void advance() throws IgniteCheckedException { - while (true) { - advanceRecord(); - - if (curRec != null) - return; - else { - advanceSegment(); - - if (curHandle == null) - return; - } - } - } - - /** - * - */ - private void advanceRecord() { - try { - ReadFileHandle hnd = curHandle; - - if (hnd != null) { - RecordSerializer ser = hnd.ser; - - int pos = (int)hnd.in.position(); - - FileWALPointer ptr = new FileWALPointer(hnd.idx, pos, 0); - - WALRecord rec = ser.readRecord(hnd.in, ptr); - - ptr.length(rec.size()); - - curRec = new IgniteBiTuple<WALPointer, WALRecord>(ptr, rec); - } - } - catch (IOException | IgniteCheckedException e) { - if (!(e instanceof SegmentEofException)) { - if (log.isInfoEnabled()) - log.info("Stopping WAL iteration due to an exception: " + e.getMessage()); - } - - curRec = null; - } + log.debug("Initialized WAL cursor [start=" + start + ", end=" + end + ", curWalSegmIdx=" + curWalSegmIdx + ']'); } - /** - * @throws IgniteCheckedException If failed. - */ - private void advanceSegment() throws IgniteCheckedException { - ReadFileHandle cur0 = curHandle; - - if (cur0 != null) { - cur0.close(); + /** {@inheritDoc} */ + @Override protected ReadFileHandle advanceSegment( + @Nullable final ReadFileHandle curWalSegment) throws IgniteCheckedException { + if (curWalSegment != null) { + curWalSegment.close(); - if (cur0.workDir) - releaseWorkSegment(cur0.idx); + if (curWalSegment.workDir) + releaseWorkSegment(curWalSegment.idx); - curHandle = null; } // We are past the end marker. - if (end != null && curIdx + 1 > end.index()) - return; + if (end != null && curWalSegmIdx + 1 > end.index()) + return null; //stop iteration - curIdx++; + curWalSegmIdx++; FileDescriptor fd; - boolean readArchive = canReadArchiveOrReserveWork(curIdx); + boolean readArchive = canReadArchiveOrReserveWork(curWalSegmIdx); if (readArchive) { fd = new FileDescriptor(new File(walArchiveDir, - FileDescriptor.fileName(curIdx))); + FileDescriptor.fileName(curWalSegmIdx))); } else { - long workIdx = curIdx % psCfg.getWalSegments(); + long workIdx = curWalSegmIdx % psCfg.getWalSegments(); fd = new FileDescriptor( new File(walWorkDir, FileDescriptor.fileName(workIdx)), - curIdx); + curWalSegmIdx); } if (log.isDebugEnabled()) - log.debug("Reading next file [absIdx=" + curIdx + ", file=" + fd.file.getAbsolutePath() + ']'); + log.debug("Reading next file [absIdx=" + curWalSegmIdx + ", file=" + fd.file.getAbsolutePath() + ']'); assert fd != null; + ReadFileHandle nextHandle; try { - curHandle = initReadHandle(fd, start != null && curIdx == start.index() ? start : null); + nextHandle = initReadHandle(fd, start != null && curWalSegmIdx == start.index() ? start : null); } catch (FileNotFoundException e) { if (readArchive) throw new IgniteCheckedException("Missing WAL segment in the archive", e); else - curHandle = null; + nextHandle = null; } - if (curHandle != null) - curHandle.workDir = !readArchive; + if (nextHandle != null) + nextHandle.workDir = !readArchive; else - releaseWorkSegment(curIdx); + releaseWorkSegment(curWalSegmIdx); curRec = null; - } - - /** - * @param desc File descriptor. - * @param start Optional start pointer. - * @return Initialized file handle. - * @throws FileNotFoundException If segment file is missing. - * @throws IgniteCheckedException If initialized failed due to another unexpected error. - */ - private ReadFileHandle initReadHandle(FileDescriptor desc, FileWALPointer start) - throws IgniteCheckedException, FileNotFoundException { - try { - RandomAccessFile rf = new RandomAccessFile(desc.file, "r"); - - try { - FileChannel channel = rf.getChannel(); - FileInput in = new FileInput(channel, buf); - - // Header record must be agnostic to the serializer version. - WALRecord rec = serializer.readRecord(in, - new FileWALPointer(desc.idx, (int)channel.position(), 0)); - - if (rec == null) - return null; - - if (rec.type() != WALRecord.RecordType.HEADER_RECORD) - throw new IOException("Missing file header record: " + desc.file.getAbsoluteFile()); - - int ver = ((HeaderRecord)rec).version(); - - RecordSerializer ser = forVersion(cctx, ver); - - if (start != null && desc.idx == start.index()) - in.seek(start.fileOffset()); - - return new ReadFileHandle(rf, desc.idx, cctx.igniteInstanceName(), ser, in); - } - catch (SegmentEofException | EOFException ignore) { - try { - rf.close(); - } - catch (IOException ce) { - throw new IgniteCheckedException(ce); - } - - return null; - } - catch (IOException | IgniteCheckedException e) { - try { - rf.close(); - } - catch (IOException ce) { - e.addSuppressed(ce); - } - - throw e; - } - } - catch (FileNotFoundException e) { - throw e; - } - catch (IOException e) { - throw new IgniteCheckedException( - "Failed to initialize WAL segment: " + desc.file.getAbsolutePath(), e); - } + return nextHandle; } /** * @param absIdx Absolute index to check. - * @return {@code True} if we can safely read the archive, {@code false} if the segment has not been - * archived yet. In this case the corresponding work segment is reserved (will not be deleted until - * release). + * @return {@code True} if we can safely read the archive, {@code false} if the segment has not been archived + * yet. In this case the corresponding work segment is reserved (will not be deleted until release). */ private boolean canReadArchiveOrReserveWork(long absIdx) { return archiver != null && archiver.checkCanReadArchiveOrReserveWorkSegment(absIdx); @@ -2490,51 +2482,17 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** - * Periodically flushes current file handle for {@link WALMode#BACKGROUND} WALMode. + * Flushes current file handle for {@link WALMode#BACKGROUND} WALMode. + * Called periodically from scheduler. */ - private class QueueFlusher extends Thread { - /** */ - private volatile boolean stopped; + private void doFlush() { + final FileWriteHandle hnd = currentHandle(); - /** - * @param gridName Grid name. - */ - private QueueFlusher(String gridName) { - super("wal-queue-flusher-#" + gridName); - } - - /** {@inheritDoc} */ - @Override public void run() { - while (!stopped) { - long wakeup = U.currentTimeMillis() + flushFreq; - - LockSupport.parkUntil(wakeup); - - FileWriteHandle hnd = currentHandle(); - - try { - hnd.flush(hnd.head.get()); - } - catch (IgniteCheckedException e) { - U.warn(log, "Failed to flush WAL record queue", e); - } - } + try { + hnd.flush(hnd.head.get()); } - - /** - * Signals stop, wakes up thread and waiting until completion. - */ - private void shutdown() { - stopped = true; - - LockSupport.unpark(this); - - try { - join(); - } - catch (InterruptedException ignore) { - // Got interrupted while waiting for flusher to shutdown. - } + catch (IgniteCheckedException e) { + U.warn(log, "Failed to flush WAL record queue", e); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordSerializer.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordSerializer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordSerializer.java index 75a62a9..1ea7fa6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordSerializer.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/RecordSerializer.java @@ -33,6 +33,8 @@ public interface RecordSerializer { public int version(); /** + * Calculates record size in byte including expected wal pointer, CRC and type field + * * @param record Record. * @return Size in bytes. */ @@ -45,7 +47,10 @@ public interface RecordSerializer { public void writeRecord(WALRecord record, ByteBuffer buf) throws IgniteCheckedException; /** + * Loads record from input + * * @param in Data input to read data from. + * @param expPtr expected WAL pointer for record. Used to validate actual position against expected from the file * @return Read entry. */ public WALRecord readRecord(FileInput in, WALPointer expPtr) throws IOException, IgniteCheckedException; http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentArchiveResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentArchiveResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentArchiveResult.java new file mode 100644 index 0000000..5b65970 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentArchiveResult.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.cache.persistence.wal; + +import java.io.File; + +/** + * Result of archiving (movement) operation + * Replacement of generic T3-Tuple + */ +class SegmentArchiveResult { + /** Absolute WAL segment file index. */ + private final long absWalIdx; + + /** Original work file. May and most likely to be used for new WAL round */ + private final File origWorkFile; + + /** Destination archive file. This file is completed and closed archive segment */ + private final File dstArchiveFile; + + /** + * Creates result + * @param absWalIdx Absolute wal index. + * @param origWorkFile Orig work file. + * @param dstArchiveFile Dst archive file. + */ + SegmentArchiveResult(long absWalIdx, File origWorkFile, File dstArchiveFile) { + this.absWalIdx = absWalIdx; + this.origWorkFile = origWorkFile; + this.dstArchiveFile = dstArchiveFile; + } + + /** @return {@link #absWalIdx} */ + long getAbsIdx() { + return absWalIdx; + } + + /** @return {@link #origWorkFile} */ + File getOrigWorkFile() { + return origWorkFile; + } + + /** @return {@link #dstArchiveFile} */ + File getDstArchiveFile() { + return dstArchiveFile; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentEofException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentEofException.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentEofException.java index 80c375e..2f58e3d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentEofException.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/SegmentEofException.java @@ -21,7 +21,8 @@ import org.apache.ignite.IgniteCheckedException; /** * This exception is thrown either when we reach the end of file of WAL segment, or when we encounter - * a record with type equal to {@code 0}. + * a record with type equal to + * {@link org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType#STOP_ITERATION_RECORD_TYPE} */ public class SegmentEofException extends IgniteCheckedException { /** */ http://git-wip-us.apache.org/repos/asf/ignite/blob/44f3fac2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java new file mode 100644 index 0000000..8ea0585 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal.reader; + +import java.io.File; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.configuration.MemoryConfiguration; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.pagemem.wal.WALIterator; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.jetbrains.annotations.NotNull; + +/** + * Factory for creating iterator over WAL files + */ +public class IgniteWalIteratorFactory { + /** Logger. */ + private final IgniteLogger log; + /** Page size, in standalone iterator mode this value can't be taken from memory configuration */ + private final int pageSize; + + /** + * Creates WAL files iterator factory + * @param log Logger. + * @param pageSize Page size, size is validated + */ + public IgniteWalIteratorFactory(@NotNull final IgniteLogger log, final int pageSize) { + this.log = log; + this.pageSize = pageSize; + new MemoryConfiguration().setPageSize(pageSize); // just for validate + } + + /** + * Creates iterator for (archive) directory scan mode. + * Note in this mode total scanned files at end of iteration may be wider that initial files in directory. + * This mode does not support work directory scan because work directory contains unpredictable number in file name. + * Such file may broke iteration. + * + * @param walDirWithConsistentId directory with WAL files. Should already contain node consistent ID as subfolder + * @return closable WAL records iterator, should be closed when non needed + * @throws IgniteCheckedException if failed to read folder + */ + public WALIterator iteratorArchiveDirectory(@NotNull final File walDirWithConsistentId) throws IgniteCheckedException { + return new StandaloneWalRecordsIterator(walDirWithConsistentId, log, prepareSharedCtx()); + } + + /** + * Creates iterator for file by file scan mode. + * This method may be used only for archive folder (not for work). + * In this mode only provided WAL segments will be scanned. New WAL files created during iteration will be ignored + * @param files files to scan. Order it not important, but is significant to provide all segments without omissions + * @return closable WAL records iterator, should be closed when non needed + * @throws IgniteCheckedException if failed to read files + */ + public WALIterator iteratorArchiveFiles(@NotNull final File ...files) throws IgniteCheckedException { + return new StandaloneWalRecordsIterator(log, prepareSharedCtx(), false, files); + } + + /** + * Creates iterator for file by file scan mode. + * This method may be used for work folder, file indexes are scanned from the file context. + * In this mode only provided WAL segments will be scanned. New WAL files created during iteration will be ignored. + * @param files files to scan. Order it not important, but is significant to provide all segments without omissions + * @return closable WAL records iterator, should be closed when non needed + * @throws IgniteCheckedException if failed to read files + */ + public WALIterator iteratorWorkFiles(@NotNull final File ...files) throws IgniteCheckedException { + return new StandaloneWalRecordsIterator(log, prepareSharedCtx(), true, files); + } + + /** + * @return fake shared context required for create minimal services for record reading + */ + @NotNull private GridCacheSharedContext prepareSharedCtx() { + final GridKernalContext kernalCtx = new StandaloneGridKernalContext(log); + + final StandaloneIgniteCacheDatabaseSharedManager dbMgr = new StandaloneIgniteCacheDatabaseSharedManager(); + + dbMgr.setPageSize(pageSize); + return new GridCacheSharedContext<>( + kernalCtx, null, null, null, + null, null, dbMgr, null, + null, null, null, null, + null, null, null); + } +}
