IGNITE-5772 - Fixed race between WAL segment rollover and a concurrent log. Closes #2313
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6de0571c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6de0571c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6de0571c Branch: refs/heads/ignite-5757 Commit: 6de0571c21ffdb77af7bb1d18e9659126d7f321b Parents: 199b954 Author: Ilya Lantukh <[email protected]> Authored: Fri Jul 21 16:35:43 2017 +0300 Committer: Alexey Goncharuk <[email protected]> Committed: Fri Jul 21 16:35:43 2017 +0300 ---------------------------------------------------------------------- .../wal/FileWriteAheadLogManager.java | 93 +++++++++++++------- 1 file changed, 61 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6de0571c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index 897f903..b655ddf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -319,7 +319,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl try { if (mode == WALMode.BACKGROUND) { if (currHnd != null) - currHnd.flush((FileWALPointer)null); + currHnd.flush((FileWALPointer)null, true); } if (currHnd != null) @@ -526,7 +526,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl return; if (mode == WALMode.LOG_ONLY || forceFlush) { - cur.flushOrWait(filePtr); + cur.flushOrWait(filePtr, false); return; } @@ -535,7 +535,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (filePtr != null && !cur.needFsync(filePtr)) return; - cur.fsync(filePtr); + cur.fsync(filePtr, false); } /** {@inheritDoc} */ @@ -1700,12 +1700,29 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl this.maxSegmentSize = maxSegmentSize; this.serializer = serializer; - head.set(new FakeRecord(new FileWALPointer(idx, (int)pos, 0))); + head.set(new FakeRecord(new FileWALPointer(idx, (int)pos, 0), false)); written = pos; lastFsyncPos = pos; } /** + * Checks if current head is a close fake record and returns {@code true} if so. + * + * @return {@code true} if current head is close record. + */ + private boolean stopped() { + return stopped(head.get()); + } + + /** + * @param record Record to check. + * @return {@code true} if the record is fake close record. + */ + private boolean stopped(WALRecord record) { + return record instanceof FakeRecord && ((FakeRecord)record).stop; + } + + /** * @param rec Record to be added to record chain as new {@link #head} * @return Pointer or null if roll over to next segment is required or already started by other thread. * @throws StorageException If failed. @@ -1721,9 +1738,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl long nextPos = nextPosition(h); - // It is important that we read `stop` after `head` in this loop for correct close, - // because otherwise we will have a race on the last flush in close. - if (nextPos + rec.size() >= maxSegmentSize || stop.get()) { + if (nextPos + rec.size() >= maxSegmentSize || stopped(h)) { // Can not write to this segment, need to switch to the next one. return null; } @@ -1731,7 +1746,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl int newChainSize = h.chainSize() + rec.size(); if (newChainSize > tlbSize && !flushed) { - boolean res = h.previous() == null || flush(h); + boolean res = h.previous() == null || flush(h, false); if (rec.size() > tlbSize) flushed = res; @@ -1770,7 +1785,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @param ptr Pointer. * @throws IgniteCheckedException If failed. */ - private void flushOrWait(FileWALPointer ptr) throws IgniteCheckedException { + private void flushOrWait(FileWALPointer ptr, boolean stop) throws IgniteCheckedException { long expWritten; if (ptr != null) { @@ -1783,7 +1798,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl else // We read head position before the flush because otherwise we can get wrong position. expWritten = recordOffset(head.get()); - if (flush(ptr)) + if (flush(ptr, stop)) return; // Spin-wait for a while before acquiring the lock. @@ -1810,18 +1825,20 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @throws IgniteCheckedException If failed. * @throws StorageException If failed. */ - private boolean flush(FileWALPointer ptr) throws IgniteCheckedException, StorageException { + private boolean flush(FileWALPointer ptr, boolean stop) throws IgniteCheckedException, StorageException { if (ptr == null) { // Unconditional flush. for (; ; ) { WALRecord expHead = head.get(); if (expHead.previous() == null) { - assert expHead instanceof FakeRecord; + FakeRecord frHead = (FakeRecord)expHead; - return false; + if (frHead.stop == stop || frHead.stop || + head.compareAndSet(expHead, new FakeRecord(frHead.position(), stop))) + return false; } - if (flush(expHead)) + if (flush(expHead, stop)) return true; } } @@ -1835,7 +1852,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (chainBeginPosition(h) > ptr.fileOffset()) return false; - if (flush(h)) + if (flush(h, stop)) return true; // We are lucky. } } @@ -1853,17 +1870,18 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @throws IgniteCheckedException If failed. * @throws StorageException If failed. */ - private boolean flush(WALRecord expHead) throws StorageException, IgniteCheckedException { + private boolean flush(WALRecord expHead, boolean stop) throws StorageException, IgniteCheckedException { if (expHead.previous() == null) { - assert expHead instanceof FakeRecord; + FakeRecord frHead = (FakeRecord)expHead; - return false; + if (stop == frHead.stop) + return false; } // Fail-fast before CAS. checkEnvironment(); - if (!head.compareAndSet(expHead, new FakeRecord(new FileWALPointer(idx, (int)nextPosition(expHead), 0)))) + if (!head.compareAndSet(expHead, new FakeRecord(new FileWALPointer(idx, (int)nextPosition(expHead), 0), stop))) return false; // At this point we grabbed the piece of WAL chain. @@ -1976,7 +1994,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @param ptr Pointer to sync. * @throws StorageException If failed. */ - private void fsync(FileWALPointer ptr) throws StorageException, IgniteCheckedException { + private void fsync(FileWALPointer ptr, boolean stop) throws StorageException, IgniteCheckedException { lock.lock(); try { @@ -1984,7 +2002,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (!needFsync(ptr)) return; - if (fsyncDelay > 0 && !stop.get()) { + if (fsyncDelay > 0 && !stopped()) { // Delay fsync to collect as many updates as possible: trade latency for throughput. U.await(fsync, fsyncDelay, TimeUnit.NANOSECONDS); @@ -1993,7 +2011,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } } - flushOrWait(ptr); + flushOrWait(ptr, stop); if (lastFsyncPos != written) { assert lastFsyncPos < written; // Fsync position must be behind. @@ -2031,13 +2049,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @throws StorageException If failed. */ private boolean close(boolean rollOver) throws IgniteCheckedException, StorageException { - if (stop.compareAndSet(false, true)) { - // Here we can be sure that no other records will be added and this fsync will be the last. - if (mode == WALMode.DEFAULT) - fsync(null); - else - flushOrWait(null); + if (mode == WALMode.DEFAULT) + fsync(null, true); + else + flushOrWait(null, true); + + assert stopped() : "Segment is not closed after close flush: " + head.get(); + if (stop.compareAndSet(false, true)) { try { int switchSegmentRecSize = RecordV1Serializer.REC_TYPE_SIZE + RecordV1Serializer.FILE_WAL_POINTER_SIZE; @@ -2068,8 +2087,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl return true; } - - return false; + else + return false; } /** @@ -2271,17 +2290,27 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * Fake record is allowed to have no previous record. */ private static final class FakeRecord extends WALRecord { + /** */ + private final boolean stop; + /** * @param pos Position. */ - FakeRecord(FileWALPointer pos) { + FakeRecord(FileWALPointer pos, boolean stop) { position(pos); + + this.stop = stop; } /** {@inheritDoc} */ @Override public RecordType type() { return null; } + + /** {@inheritDoc} */ + @Override public FileWALPointer position() { + return (FileWALPointer) super.position(); + } } /** @@ -2492,7 +2521,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private void doFlush() { final FileWriteHandle hnd = currentHandle(); try { - hnd.flush(hnd.head.get()); + hnd.flush(hnd.head.get(), false); } catch (Exception e) { U.warn(log, "Failed to flush WAL record queue", e);
