IGNITE-8749 Exception for "no space left" situation should be propagated to FailureHandler - Fixes #4200.
Signed-off-by: Ivan Rakov <ira...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3fff8a85 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3fff8a85 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3fff8a85 Branch: refs/heads/ignite-8446 Commit: 3fff8a85e0b3297883b6bdae3b46e4d34352c630 Parents: b80ccda Author: Dmitriy Sorokin <d.w.soro...@gmail.com> Authored: Wed Jun 20 16:41:46 2018 +0300 Committer: Ivan Rakov <ira...@apache.org> Committed: Wed Jun 20 16:41:46 2018 +0300 ---------------------------------------------------------------------- .../wal/FileWriteAheadLogManager.java | 190 +++++++------- .../wal/FsyncModeFileWriteAheadLogManager.java | 235 +++++++++-------- .../ignite/failure/TestFailureHandler.java | 19 ++ .../db/wal/IgniteWalFormatFileFailoverTest.java | 258 +++++++++++++++++++ .../ignite/testsuites/IgnitePdsTestSuite2.java | 3 + 5 files changed, 513 insertions(+), 192 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3fff8a85/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index 5703d17..3ca51f3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -604,48 +604,43 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** {@inheritDoc} */ @Override public void resumeLogging(WALPointer lastPtr) throws IgniteCheckedException { - try { - assert currHnd == null; - assert lastPtr == null || lastPtr instanceof FileWALPointer; + assert currHnd == null; + assert lastPtr == null || lastPtr instanceof FileWALPointer; - FileWALPointer filePtr = (FileWALPointer)lastPtr; + FileWALPointer filePtr = (FileWALPointer)lastPtr; - walWriter = new WALWriter(log); + walWriter = new WALWriter(log); - if (!mmap) - new IgniteThread(walWriter).start(); + if (!mmap) + new IgniteThread(walWriter).start(); - currHnd = restoreWriteHandle(filePtr); + currHnd = restoreWriteHandle(filePtr); - // For new handle write serializer version to it. - if (filePtr == null) - currHnd.writeHeader(); + // For new handle write serializer version to it. + if (filePtr == null) + currHnd.writeHeader(); - if (currHnd.serializer.version() != serializer.version()) { - if (log.isInfoEnabled()) - log.info("Record serializer version change detected, will start logging with a new WAL record " + - "serializer to a new WAL segment [curFile=" + currHnd + ", newVer=" + serializer.version() + - ", oldVer=" + currHnd.serializer.version() + ']'); + if (currHnd.serializer.version() != serializer.version()) { + if (log.isInfoEnabled()) + log.info("Record serializer version change detected, will start logging with a new WAL record " + + "serializer to a new WAL segment [curFile=" + currHnd + ", newVer=" + serializer.version() + + ", oldVer=" + currHnd.serializer.version() + ']'); - rollOver(currHnd); - } + rollOver(currHnd); + } - currHnd.resume = false; + currHnd.resume = false; - if (mode == WALMode.BACKGROUND) { - backgroundFlushSchedule = cctx.time().schedule(new Runnable() { - @Override public void run() { - doFlush(); - } - }, flushFreq, flushFreq); - } - - if (walAutoArchiveAfterInactivity > 0) - scheduleNextInactivityPeriodElapsedCheck(); - } - catch (StorageException e) { - throw new IgniteCheckedException(e); + if (mode == WALMode.BACKGROUND) { + backgroundFlushSchedule = cctx.time().schedule(new Runnable() { + @Override public void run() { + doFlush(); + } + }, flushFreq, flushFreq); } + + if (walAutoArchiveAfterInactivity > 0) + scheduleNextInactivityPeriodElapsedCheck(); } /** @@ -1130,9 +1125,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** * @param lastReadPtr Last read WAL file pointer. * @return Initialized file write handle. - * @throws IgniteCheckedException If failed to initialize WAL write handle. + * @throws StorageException If failed to initialize WAL write handle. */ - private FileWriteHandle restoreWriteHandle(FileWALPointer lastReadPtr) throws IgniteCheckedException { + private FileWriteHandle restoreWriteHandle(FileWALPointer lastReadPtr) throws StorageException { long absIdx = lastReadPtr == null ? 0 : lastReadPtr.index(); @Nullable FileArchiver archiver0 = archiver; @@ -1174,14 +1169,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl SegmentedRingByteBuffer rbuf; if (mmap) { - try { - MappedByteBuffer buf = fileIO.map((int)maxWalSegmentSize); + MappedByteBuffer buf = fileIO.map((int)maxWalSegmentSize); - rbuf = new SegmentedRingByteBuffer(buf, metrics); - } - catch (IOException e) { - throw new IgniteCheckedException(e); - } + rbuf = new SegmentedRingByteBuffer(buf, metrics); } else rbuf = new SegmentedRingByteBuffer(dsCfg.getWalBufferSize(), maxWalSegmentSize, DIRECT, metrics); @@ -1205,13 +1195,21 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl return hnd; } catch (IgniteCheckedException | IOException e) { - fileIO.close(); + try { + fileIO.close(); + } + catch (IOException suppressed) { + e.addSuppressed(suppressed); + } - throw e; + if (e instanceof StorageException) + throw (StorageException) e; + + throw e instanceof IOException ? (IOException) e : new IOException(e); } } catch (IOException e) { - throw new IgniteCheckedException("Failed to restore WAL write handle: " + curFile.getAbsolutePath(), e); + throw new StorageException("Failed to restore WAL write handle: " + curFile.getAbsolutePath(), e); } } @@ -1221,10 +1219,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * * @param cur Current file write handle released by WAL writer * @return Initialized file handle. - * @throws StorageException If IO exception occurred. - * @throws IgniteCheckedException If failed. + * @throws IgniteCheckedException If exception occurred. */ - private FileWriteHandle initNextWriteHandle(FileWriteHandle cur) throws StorageException, IgniteCheckedException { + private FileWriteHandle initNextWriteHandle(FileWriteHandle cur) throws IgniteCheckedException { + IgniteCheckedException error = null; + try { File nextFile = pollNextFile(cur.idx); @@ -1298,19 +1297,24 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl return hnd; } + catch (IgniteCheckedException e) { + throw error = e; + } catch (IOException e) { - StorageException se = new StorageException("Unable to initialize WAL segment", e); - - cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, se)); - - throw se; + throw error = new StorageException("Unable to initialize WAL segment", e); + } + finally { + if (error != null) + cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, error)); } } /** * Deletes temp files, creates and prepares new; Creates first segment if necessary + * + * @throws StorageException If failed. */ - private void checkOrPrepareFiles() throws IgniteCheckedException { + private void checkOrPrepareFiles() throws StorageException { // Clean temp files. { File[] tmpFiles = walWorkDir.listFiles(WAL_SEGMENT_TEMP_FILE_FILTER); @@ -1320,7 +1324,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl boolean deleted = tmp.delete(); if (!deleted) - throw new IgniteCheckedException("Failed to delete previously created temp file " + + throw new StorageException("Failed to delete previously created temp file " + "(make sure Ignite process has enough rights): " + tmp.getAbsolutePath()); } } @@ -1330,7 +1334,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if(isArchiverEnabled()) if (allFiles.length != 0 && allFiles.length > dsCfg.getWalSegments()) - throw new IgniteCheckedException("Failed to initialize wal (work directory contains " + + throw new StorageException("Failed to initialize wal (work directory contains " + "incorrect number of segments) [cur=" + allFiles.length + ", expected=" + dsCfg.getWalSegments() + ']'); // Allocate the first segment synchronously. All other segments will be allocated by archiver in background. @@ -1370,9 +1374,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * Clears whole the file, fills with zeros for Default mode. * * @param file File to format. - * @throws IgniteCheckedException if formatting failed + * @throws StorageException if formatting failed */ - private void formatFile(File file) throws IgniteCheckedException { + private void formatFile(File file) throws StorageException { formatFile(file, dsCfg.getWalSegmentSize()); } @@ -1381,9 +1385,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * * @param file File to format. * @param bytesCntToFormat Count of first bytes to format. - * @throws IgniteCheckedException if formatting failed + * @throws StorageException if formatting failed */ - private void formatFile(File file, int bytesCntToFormat) throws IgniteCheckedException { + private void formatFile(File file, int bytesCntToFormat) throws StorageException { if (log.isDebugEnabled()) log.debug("Formatting file [exists=" + file.exists() + ", file=" + file.getAbsolutePath() + ']'); @@ -1395,7 +1399,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl int toWrite = Math.min(FILL_BUF.length, left); if (fileIO.write(FILL_BUF, 0, toWrite) < toWrite) { - final IgniteCheckedException ex = new IgniteCheckedException("Failed to extend WAL segment file: " + + final StorageException ex = new StorageException("Failed to extend WAL segment file: " + file.getName() + ". Probably disk is too busy, please check your device."); if (failureProcessor != null) @@ -1413,7 +1417,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl fileIO.clear(); } catch (IOException e) { - throw new IgniteCheckedException("Failed to format WAL segment file: " + file.getAbsolutePath(), e); + throw new StorageException("Failed to format WAL segment file: " + file.getAbsolutePath(), e); } } @@ -1421,9 +1425,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * Creates a file atomically with temp file. * * @param file File to create. - * @throws IgniteCheckedException If failed. + * @throws StorageException If failed. */ - private void createFile(File file) throws IgniteCheckedException { + private void createFile(File file) throws StorageException { if (log.isDebugEnabled()) log.debug("Creating new file [exists=" + file.exists() + ", file=" + file.getAbsolutePath() + ']'); @@ -1435,7 +1439,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl Files.move(tmp.toPath(), file.toPath()); } catch (IOException e) { - throw new IgniteCheckedException("Failed to move temp file to a regular WAL segment file: " + + throw new StorageException("Failed to move temp file to a regular WAL segment file: " + file.getAbsolutePath(), e); } @@ -1448,9 +1452,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * * @param curIdx Current absolute WAL segment index. * @return File ready for use as new WAL segment. - * @throws IgniteCheckedException If failed. + * @throws StorageException If exception occurred in the archiver thread. */ - private File pollNextFile(long curIdx) throws IgniteCheckedException { + private File pollNextFile(long curIdx) throws StorageException { FileArchiver archiver0 = archiver; if (archiver0 == null) { @@ -1526,7 +1530,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl */ private class FileArchiver extends GridWorker { /** Exception which occurred during initial creation of files or during archiving WAL segment */ - private IgniteCheckedException cleanErr; + private StorageException cleanErr; /** * Absolute current segment index WAL Manager writes to. Guarded by <code>this</code>. Incremented during @@ -1598,15 +1602,17 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl try { allocateRemainingFiles(); } - catch (IgniteCheckedException e) { + catch (StorageException e) { synchronized (this) { // Stop the thread and report to starter. cleanErr = e; notifyAll(); - - return; } + + cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, e)); + + return; } Throwable err = null; @@ -1690,9 +1696,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * * @param curIdx Current absolute index that we want to increment. * @return Next index (curWalSegmIdx+1) when it is ready to be written. - * @throws IgniteCheckedException If failed (if interrupted or if exception occurred in the archiver thread). + * @throws StorageException If exception occurred in the archiver thread. */ - private long nextAbsoluteSegmentIndex(long curIdx) throws IgniteCheckedException { + private long nextAbsoluteSegmentIndex(long curIdx) throws StorageException { synchronized (this) { if (cleanErr != null) throw cleanErr; @@ -1707,6 +1713,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl while (curAbsWalIdx - lastAbsArchivedIdx > dsCfg.getWalSegments() && cleanErr == null) { try { wait(); + + if (cleanErr != null) + throw cleanErr; } catch (InterruptedException ignore) { interrupted.set(true); @@ -1714,9 +1723,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } // Wait for formatter so that we do not open an empty file in DEFAULT mode. - while (curAbsWalIdx % dsCfg.getWalSegments() > formatted) + while (curAbsWalIdx % dsCfg.getWalSegments() > formatted && cleanErr == null) try { wait(); + + if (cleanErr != null) + throw cleanErr; } catch (InterruptedException ignore) { interrupted.set(true); @@ -1788,7 +1800,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * * @param absIdx Absolute index to archive. */ - private SegmentArchiveResult archiveSegment(long absIdx) throws IgniteCheckedException { + private SegmentArchiveResult archiveSegment(long absIdx) throws StorageException { long segIdx = absIdx % dsCfg.getWalSegments(); File origFile = new File(walWorkDir, FileDescriptor.fileName(segIdx)); @@ -1817,7 +1829,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } } catch (IOException e) { - throw new IgniteCheckedException("Failed to archive WAL segment [" + + throw new StorageException("Failed to archive WAL segment [" + "srcFile=" + origFile.getAbsolutePath() + ", dstFile=" + dstTmpFile.getAbsolutePath() + ']', e); } @@ -1840,7 +1852,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * Background creation of all segments except first. First segment was created in main thread by {@link * FileWriteAheadLogManager#checkOrPrepareFiles()} */ - private void allocateRemainingFiles() throws IgniteCheckedException { + private void allocateRemainingFiles() throws StorageException { checkFiles( 1, true, @@ -2234,23 +2246,23 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * @param startWith Start with. * @param create Flag create file. * @param p Predicate Exit condition. - * @throws IgniteCheckedException if validation or create file fail. + * @throws StorageException if validation or create file fail. */ private void checkFiles( int startWith, boolean create, @Nullable IgnitePredicate<Integer> p, @Nullable IgniteInClosure<Integer> completionCallback - ) throws IgniteCheckedException { + ) throws StorageException { for (int i = startWith; i < dsCfg.getWalSegments() && (p == null || p.apply(i)); i++) { File checkFile = new File(walWorkDir, FileDescriptor.fileName(i)); if (checkFile.exists()) { if (checkFile.isDirectory()) - throw new IgniteCheckedException("Failed to initialize WAL log segment (a directory with " + + throw new StorageException("Failed to initialize WAL log segment (a directory with " + "the same name already exists): " + checkFile.getAbsolutePath()); else if (checkFile.length() != dsCfg.getWalSegmentSize() && mode == WALMode.FSYNC) - throw new IgniteCheckedException("Failed to initialize WAL log segment " + + throw new StorageException("Failed to initialize WAL log segment " + "(WAL segment size change is not supported in 'DEFAULT' WAL mode) " + "[filePath=" + checkFile.getAbsolutePath() + ", fileSize=" + checkFile.length() + @@ -2650,9 +2662,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl * Flush or wait for concurrent flush completion. * * @param ptr Pointer. - * @throws IgniteCheckedException If failed. */ - private void flushOrWait(FileWALPointer ptr) throws IgniteCheckedException { + private void flushOrWait(FileWALPointer ptr) { if (ptr != null) { // If requested obsolete file index, it must be already flushed by close. if (ptr.index() != idx) @@ -2664,10 +2675,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** * @param ptr Pointer. - * @throws IgniteCheckedException If failed. - * @throws StorageException If failed. */ - private void flush(FileWALPointer ptr) throws IgniteCheckedException, StorageException { + private void flush(FileWALPointer ptr) { if (ptr == null) { // Unconditional flush. walWriter.flushAll(); @@ -2883,7 +2892,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } } catch (IOException e) { - throw new IgniteCheckedException(e); + throw new StorageException("Failed to close WAL write handle [idx=" + idx + "]", e); } if (log.isDebugEnabled()) @@ -3364,28 +3373,29 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** * Forces all made changes to the file. */ - void force() throws IgniteCheckedException { + void force() { flushBuffer(FILE_FORCE); } /** * Closes file. */ - void close() throws IgniteCheckedException { + void close() { flushBuffer(FILE_CLOSE); } /** * Flushes all data from the buffer. */ - void flushAll() throws IgniteCheckedException { + void flushAll() { flushBuffer(UNCONDITIONAL_FLUSH); } /** * @param expPos Expected position. */ - void flushBuffer(long expPos) throws StorageException, IgniteCheckedException { + @SuppressWarnings("ForLoopReplaceableByForEach") + void flushBuffer(long expPos) { if (mmap) return; http://git-wip-us.apache.org/repos/asf/ignite/blob/3fff8a85/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java index 49fbc73..6e59ad3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java @@ -487,37 +487,32 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda /** {@inheritDoc} */ @Override public void resumeLogging(WALPointer lastPtr) throws IgniteCheckedException { - try { - assert currentHnd == null; - assert lastPtr == null || lastPtr instanceof FileWALPointer; - - FileWALPointer filePtr = (FileWALPointer)lastPtr; + assert currentHnd == null; + assert lastPtr == null || lastPtr instanceof FileWALPointer; - currentHnd = restoreWriteHandle(filePtr); - - if (currentHnd.serializer.version() != serializer.version()) { - if (log.isInfoEnabled()) - log.info("Record serializer version change detected, will start logging with a new WAL record " + - "serializer to a new WAL segment [curFile=" + currentHnd + ", newVer=" + serializer.version() + - ", oldVer=" + currentHnd.serializer.version() + ']'); + FileWALPointer filePtr = (FileWALPointer)lastPtr; - rollOver(currentHnd); - } + currentHnd = restoreWriteHandle(filePtr); - if (mode == WALMode.BACKGROUND) { - backgroundFlushSchedule = cctx.time().schedule(new Runnable() { - @Override public void run() { - doFlush(); - } - }, flushFreq, flushFreq); - } + if (currentHnd.serializer.version() != serializer.version()) { + if (log.isInfoEnabled()) + log.info("Record serializer version change detected, will start logging with a new WAL record " + + "serializer to a new WAL segment [curFile=" + currentHnd + ", newVer=" + serializer.version() + + ", oldVer=" + currentHnd.serializer.version() + ']'); - if (walAutoArchiveAfterInactivity > 0) - scheduleNextInactivityPeriodElapsedCheck(); + rollOver(currentHnd); } - catch (StorageException e) { - throw new IgniteCheckedException(e); + + if (mode == WALMode.BACKGROUND) { + backgroundFlushSchedule = cctx.time().schedule(new Runnable() { + @Override public void run() { + doFlush(); + } + }, flushFreq, flushFreq); } + + if (walAutoArchiveAfterInactivity > 0) + scheduleNextInactivityPeriodElapsedCheck(); } /** @@ -1019,7 +1014,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda * @param cur Handle that failed to fit the given entry. * @return Handle that will fit the entry. */ - private FileWriteHandle rollOver(FileWriteHandle cur) throws StorageException, IgniteCheckedException { + private FileWriteHandle rollOver(FileWriteHandle cur) throws IgniteCheckedException { FileWriteHandle hnd = currentHandle(); if (hnd != cur) @@ -1050,9 +1045,9 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda /** * @param lastReadPtr Last read WAL file pointer. * @return Initialized file write handle. - * @throws IgniteCheckedException If failed to initialize WAL write handle. + * @throws StorageException If failed to initialize WAL write handle. */ - private FileWriteHandle restoreWriteHandle(FileWALPointer lastReadPtr) throws IgniteCheckedException { + private FileWriteHandle restoreWriteHandle(FileWALPointer lastReadPtr) throws StorageException { long absIdx = lastReadPtr == null ? 0 : lastReadPtr.index(); long segNo = absIdx % dsCfg.getWalSegments(); @@ -1100,13 +1095,21 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda return hnd; } catch (IgniteCheckedException | IOException e) { - fileIO.close(); + try { + fileIO.close(); + } + catch (IOException suppressed) { + e.addSuppressed(suppressed); + } - throw e; + if (e instanceof StorageException) + throw (StorageException) e; + + throw e instanceof IOException ? (IOException) e : new IOException(e); } } catch (IOException e) { - throw new IgniteCheckedException("Failed to restore WAL write handle: " + curFile.getAbsolutePath(), e); + throw new StorageException("Failed to restore WAL write handle: " + curFile.getAbsolutePath(), e); } } @@ -1117,10 +1120,11 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda * * @param curIdx current absolute segment released by WAL writer * @return Initialized file handle. - * @throws StorageException If IO exception occurred. - * @throws IgniteCheckedException If failed. + * @throws IgniteCheckedException If exception occurred. */ - private FileWriteHandle initNextWriteHandle(long curIdx) throws StorageException, IgniteCheckedException { + private FileWriteHandle initNextWriteHandle(long curIdx) throws IgniteCheckedException { + IgniteCheckedException error = null; + try { File nextFile = pollNextFile(curIdx); @@ -1140,19 +1144,24 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda return hnd; } + catch (IgniteCheckedException e) { + throw error = e; + } catch (IOException e) { - StorageException se = new StorageException("Unable to initialize WAL segment", e); - - cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, se)); - - throw se; + throw error = new StorageException("Unable to initialize WAL segment", e); + } + finally { + if (error != null) + cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, error)); } } /** - * Deletes temp files, creates and prepares new; Creates first segment if necessary + * Deletes temp files, creates and prepares new; Creates first segment if necessary. + * + * @throws StorageException If failed. */ - private void checkOrPrepareFiles() throws IgniteCheckedException { + private void checkOrPrepareFiles() throws StorageException { // Clean temp files. { File[] tmpFiles = walWorkDir.listFiles(WAL_SEGMENT_TEMP_FILE_FILTER); @@ -1162,7 +1171,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda boolean deleted = tmp.delete(); if (!deleted) - throw new IgniteCheckedException("Failed to delete previously created temp file " + + throw new StorageException("Failed to delete previously created temp file " + "(make sure Ignite process has enough rights): " + tmp.getAbsolutePath()); } } @@ -1171,7 +1180,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda File[] allFiles = walWorkDir.listFiles(WAL_SEGMENT_FILE_FILTER); if (allFiles.length != 0 && allFiles.length > dsCfg.getWalSegments()) - throw new IgniteCheckedException("Failed to initialize wal (work directory contains " + + throw new StorageException("Failed to initialize wal (work directory contains " + "incorrect number of segments) [cur=" + allFiles.length + ", expected=" + dsCfg.getWalSegments() + ']'); // Allocate the first segment synchronously. All other segments will be allocated by archiver in background. @@ -1188,9 +1197,9 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda * Clears whole the file, fills with zeros for Default mode. * * @param file File to format. - * @throws IgniteCheckedException if formatting failed + * @throws StorageException if formatting failed. */ - private void formatFile(File file) throws IgniteCheckedException { + private void formatFile(File file) throws StorageException { formatFile(file, dsCfg.getWalSegmentSize()); } @@ -1199,9 +1208,9 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda * * @param file File to format. * @param bytesCntToFormat Count of first bytes to format. - * @throws IgniteCheckedException if formatting failed + * @throws StorageException If formatting failed. */ - private void formatFile(File file, int bytesCntToFormat) throws IgniteCheckedException { + private void formatFile(File file, int bytesCntToFormat) throws StorageException { if (log.isDebugEnabled()) log.debug("Formatting file [exists=" + file.exists() + ", file=" + file.getAbsolutePath() + ']'); @@ -1223,7 +1232,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda fileIO.clear(); } catch (IOException e) { - throw new IgniteCheckedException("Failed to format WAL segment file: " + file.getAbsolutePath(), e); + throw new StorageException("Failed to format WAL segment file: " + file.getAbsolutePath(), e); } } @@ -1231,9 +1240,9 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda * Creates a file atomically with temp file. * * @param file File to create. - * @throws IgniteCheckedException If failed. + * @throws StorageException If failed. */ - private void createFile(File file) throws IgniteCheckedException { + private void createFile(File file) throws StorageException { if (log.isDebugEnabled()) log.debug("Creating new file [exists=" + file.exists() + ", file=" + file.getAbsolutePath() + ']'); @@ -1245,7 +1254,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda Files.move(tmp.toPath(), file.toPath()); } catch (IOException e) { - throw new IgniteCheckedException("Failed to move temp file to a regular WAL segment file: " + + throw new StorageException("Failed to move temp file to a regular WAL segment file: " + file.getAbsolutePath(), e); } @@ -1259,9 +1268,10 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda * * @param curIdx Current absolute WAL segment index. * @return File ready for use as new WAL segment. - * @throws IgniteCheckedException If failed. + * @throws StorageException If exception occurred in the archiver thread. + * @throws IgniteInterruptedCheckedException If interrupted. */ - private File pollNextFile(long curIdx) throws IgniteCheckedException { + private File pollNextFile(long curIdx) throws StorageException, IgniteInterruptedCheckedException { // Signal to archiver that we are done with the segment and it can be archived. long absNextIdx = archiver.nextAbsoluteSegmentIndex(curIdx); @@ -1318,7 +1328,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda */ private class FileArchiver extends GridWorker { /** Exception which occurred during initial creation of files or during archiving WAL segment */ - private IgniteCheckedException cleanException; + private StorageException cleanException; /** * Absolute current segment index WAL Manager writes to. Guarded by <code>this</code>. @@ -1426,15 +1436,17 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda try { allocateRemainingFiles(); } - catch (IgniteCheckedException e) { + catch (StorageException e) { synchronized (this) { // Stop the thread and report to starter. cleanException = e; notifyAll(); - - return; } + + cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, e)); + + return; } Throwable err = null; @@ -1515,9 +1527,10 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda * * @param curIdx Current absolute index that we want to increment. * @return Next index (curWalSegmIdx+1) when it is ready to be written. - * @throws IgniteCheckedException If failed (if interrupted or if exception occurred in the archiver thread). + * @throws StorageException If exception occurred in the archiver thread. + * @throws IgniteInterruptedCheckedException If interrupted. */ - private long nextAbsoluteSegmentIndex(long curIdx) throws IgniteCheckedException { + private long nextAbsoluteSegmentIndex(long curIdx) throws StorageException, IgniteInterruptedCheckedException { try { synchronized (this) { if (cleanException != null) @@ -1535,10 +1548,16 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda while ((curAbsWalIdx - lastAbsArchivedIdx > segments && cleanException == null)) wait(); + if (cleanException != null) + throw cleanException; + // Wait for formatter so that we do not open an empty file in DEFAULT mode. - while (curAbsWalIdx % dsCfg.getWalSegments() > formatted) + while (curAbsWalIdx % dsCfg.getWalSegments() > formatted && cleanException == null) wait(); + if (cleanException != null) + throw cleanException; + return curAbsWalIdx; } } @@ -1664,7 +1683,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda * Background creation of all segments except first. First segment was created in main thread by * {@link FsyncModeFileWriteAheadLogManager#checkOrPrepareFiles()} */ - private void allocateRemainingFiles() throws IgniteCheckedException { + private void allocateRemainingFiles() throws StorageException { final FileArchiver archiver = this; checkFiles(1, @@ -2029,23 +2048,23 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda * @param startWith Start with. * @param create Flag create file. * @param p Predicate Exit condition. - * @throws IgniteCheckedException if validation or create file fail. + * @throws StorageException if validation or create file fail. */ private void checkFiles( int startWith, boolean create, @Nullable IgnitePredicate<Integer> p, @Nullable IgniteInClosure<Integer> completionCallback - ) throws IgniteCheckedException { + ) throws StorageException { for (int i = startWith; i < dsCfg.getWalSegments() && (p == null || (p != null && p.apply(i))); i++) { File checkFile = new File(walWorkDir, FileDescriptor.fileName(i)); if (checkFile.exists()) { if (checkFile.isDirectory()) - throw new IgniteCheckedException("Failed to initialize WAL log segment (a directory with " + + throw new StorageException("Failed to initialize WAL log segment (a directory with " + "the same name already exists): " + checkFile.getAbsolutePath()); else if (checkFile.length() != dsCfg.getWalSegmentSize() && mode == WALMode.FSYNC) - throw new IgniteCheckedException("Failed to initialize WAL log segment " + + throw new StorageException("Failed to initialize WAL log segment " + "(WAL segment size change is not supported):" + checkFile.getAbsolutePath()); } else if (create) @@ -2410,7 +2429,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda * * @throws IOException If fail to write serializer version. */ - public void writeSerializerVersion() throws IOException { + private void writeSerializerVersion() throws IOException { try { assert fileIO.position() == 0 : "Serializer version can be written only at the begin of file " + fileIO.position(); @@ -2448,9 +2467,8 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda * @param rec Record to be added to record chain as new {@link #head} * @return Pointer or null if roll over to next segment is required or already started by other thread. * @throws StorageException If failed. - * @throws IgniteCheckedException If failed. */ - @Nullable private WALPointer addRecord(WALRecord rec) throws StorageException, IgniteCheckedException { + @Nullable private WALPointer addRecord(WALRecord rec) throws StorageException { assert rec.size() > 0 || rec.getClass() == FakeRecord.class; boolean flushed = false; @@ -2503,9 +2521,9 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda * Flush or wait for concurrent flush completion. * * @param ptr Pointer. - * @throws IgniteCheckedException If failed. + * @throws StorageException If failed. */ - private void flushOrWait(FileWALPointer ptr, boolean stop) throws IgniteCheckedException { + private void flushOrWait(FileWALPointer ptr, boolean stop) throws StorageException { long expWritten; if (ptr != null) { @@ -2549,10 +2567,9 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda /** * @param ptr Pointer. * @return {@code true} If the flush really happened. - * @throws IgniteCheckedException If failed. * @throws StorageException If failed. */ - private boolean flush(FileWALPointer ptr, boolean stop) throws IgniteCheckedException, StorageException { + private boolean flush(FileWALPointer ptr, boolean stop) throws StorageException { if (ptr == null) { // Unconditional flush. for (; ; ) { WALRecord expHead = head.get(); @@ -2594,10 +2611,9 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda /** * @param expHead Expected head of chain. If head was changed, flush is not performed in this thread - * @throws IgniteCheckedException If failed. * @throws StorageException If failed. */ - private boolean flush(WALRecord expHead, boolean stop) throws StorageException, IgniteCheckedException { + private boolean flush(WALRecord expHead, boolean stop) throws StorageException { if (expHead.previous() == null) { FakeRecord frHead = (FakeRecord)expHead; @@ -2643,7 +2659,8 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda return true; } catch (Throwable e) { - StorageException se = new StorageException("Unable to write", new IOException(e)); + StorageException se = e instanceof StorageException ? (StorageException) e : + new StorageException("Unable to write", new IOException(e)); cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, se)); @@ -2725,8 +2742,9 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda /** * @param ptr Pointer to sync. * @throws StorageException If failed. + * @throws IgniteInterruptedCheckedException If interrupted. */ - private void fsync(FileWALPointer ptr, boolean stop) throws StorageException, IgniteCheckedException { + private void fsync(FileWALPointer ptr, boolean stop) throws StorageException, IgniteInterruptedCheckedException { lock.lock(); try { @@ -2780,10 +2798,9 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda /** * @return {@code true} If this thread actually closed the segment. - * @throws IgniteCheckedException If failed. * @throws StorageException If failed. */ - private boolean close(boolean rollOver) throws IgniteCheckedException, StorageException { + private boolean close(boolean rollOver) throws StorageException { if (stop.compareAndSet(false, true)) { lock.lock(); @@ -2793,43 +2810,49 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda assert stopped() : "Segment is not closed after close flush: " + head.get(); try { - RecordSerializer backwardSerializer = new RecordSerializerFactoryImpl(cctx) - .createSerializer(serializerVersion); + try { + RecordSerializer backwardSerializer = new RecordSerializerFactoryImpl(cctx) + .createSerializer(serializerVersion); - SwitchSegmentRecord segmentRecord = new SwitchSegmentRecord(); + SwitchSegmentRecord segmentRecord = new SwitchSegmentRecord(); - int switchSegmentRecSize = backwardSerializer.size(segmentRecord); + int switchSegmentRecSize = backwardSerializer.size(segmentRecord); - if (rollOver && written < (maxSegmentSize - switchSegmentRecSize)) { - final ByteBuffer buf = ByteBuffer.allocate(switchSegmentRecSize); + if (rollOver && written < (maxSegmentSize - switchSegmentRecSize)) { + final ByteBuffer buf = ByteBuffer.allocate(switchSegmentRecSize); - segmentRecord.position(new FileWALPointer(idx, (int)written, switchSegmentRecSize)); - backwardSerializer.writeRecord(segmentRecord, buf); + segmentRecord.position(new FileWALPointer(idx, (int)written, switchSegmentRecSize)); + backwardSerializer.writeRecord(segmentRecord, buf); - buf.rewind(); + buf.rewind(); - int rem = buf.remaining(); + int rem = buf.remaining(); - while (rem > 0) { - int written0 = fileIO.write(buf, written); + while (rem > 0) { + int written0 = fileIO.write(buf, written); - written += written0; + written += written0; - rem -= written0; + rem -= written0; + } } } + catch (IgniteCheckedException e) { + throw new IOException(e); + } + finally { + assert mode == WALMode.FSYNC; - // Do the final fsync. - if (mode == WALMode.FSYNC) { + // Do the final fsync. fileIO.force(); lastFsyncPos = written; - } - fileIO.close(); + fileIO.close(); + } } catch (IOException e) { - throw new IgniteCheckedException(e); + throw new StorageException("Failed to close WAL write handle [idx=" + idx + "]", e); } if (log.isDebugEnabled()) @@ -2860,9 +2883,17 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda assert written == lastFsyncPos || mode != WALMode.FSYNC : "fsync [written=" + written + ", lastFsync=" + lastFsyncPos + ']'; - } - fileIO = null; + fileIO = null; + } + else { + try { + fileIO.close(); + } + catch (IOException e) { + U.error(log, "Failed to close WAL file [idx=" + idx + ", fileIO=" + fileIO + "]", e); + } + } nextSegment.signalAll(); } @@ -2872,9 +2903,9 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda } /** - * @throws IgniteCheckedException If failed. + * */ - private void awaitNext() throws IgniteCheckedException { + private void awaitNext() { lock.lock(); try { @@ -2894,7 +2925,7 @@ public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAda * @throws IgniteCheckedException If failed. */ @SuppressWarnings("TooBroadScope") - private void writeBuffer(long pos, ByteBuffer buf) throws StorageException, IgniteCheckedException { + private void writeBuffer(long pos, ByteBuffer buf) throws StorageException { boolean interrupted = false; lock.lock(); http://git-wip-us.apache.org/repos/asf/ignite/blob/3fff8a85/modules/core/src/test/java/org/apache/ignite/failure/TestFailureHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/failure/TestFailureHandler.java b/modules/core/src/test/java/org/apache/ignite/failure/TestFailureHandler.java index 1159683..545c9ea 100644 --- a/modules/core/src/test/java/org/apache/ignite/failure/TestFailureHandler.java +++ b/modules/core/src/test/java/org/apache/ignite/failure/TestFailureHandler.java @@ -18,6 +18,7 @@ package org.apache.ignite.failure; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.ignite.Ignite; /** @@ -35,6 +36,13 @@ public class TestFailureHandler implements FailureHandler { /** * @param invalidate Invalidate. + */ + public TestFailureHandler(boolean invalidate) { + this(invalidate, new CountDownLatch(1)); + } + + /** + * @param invalidate Invalidate. * @param latch Latch. */ public TestFailureHandler(boolean invalidate, CountDownLatch latch) { @@ -60,4 +68,15 @@ public class TestFailureHandler implements FailureHandler { public FailureContext failureContext() { return failureCtx; } + + /** + * @param millis Millis. + + * @return Failure context. + */ + public FailureContext awaitFailure(long millis) throws InterruptedException { + latch.await(millis, TimeUnit.MILLISECONDS); + + return failureCtx; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/3fff8a85/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFormatFileFailoverTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFormatFileFailoverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFormatFileFailoverTest.java new file mode 100644 index 0000000..379b8c3 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalFormatFileFailoverTest.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.db.wal; + +import java.io.File; +import java.io.IOException; +import java.nio.file.OpenOption; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.failure.FailureHandler; +import org.apache.ignite.failure.TestFailureHandler; +import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; +import org.apache.ignite.internal.pagemem.wal.StorageException; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static java.nio.file.StandardOpenOption.CREATE; +import static java.nio.file.StandardOpenOption.READ; +import static java.nio.file.StandardOpenOption.WRITE; + +/** + * + */ +public class IgniteWalFormatFileFailoverTest extends GridCommonAbstractTest { + /** */ + private static final String TEST_CACHE = "testCache"; + + /** */ + private static final String formatFile = "formatFile"; + + /** Fail method name reference. */ + private final AtomicReference<String> failMtdNameRef = new AtomicReference<>(); + + /** */ + private boolean fsync; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setCacheConfiguration(new CacheConfiguration(TEST_CACHE) + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)); + + DataStorageConfiguration memCfg = new DataStorageConfiguration() + .setDefaultDataRegionConfiguration(new DataRegionConfiguration() + .setMaxSize(2048L * 1024 * 1024) + .setPersistenceEnabled(true)) + .setWalMode(fsync ? WALMode.FSYNC : WALMode.BACKGROUND) + .setWalBufferSize(1024 * 1024) + .setWalSegmentSize(512 * 1024) + .setFileIOFactory(new FailingFileIOFactory(failMtdNameRef)); + + cfg.setDataStorageConfiguration(memCfg); + + cfg.setFailureHandler(new TestFailureHandler(false)); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testNodeStartFailedFsync() throws Exception { + fsync = true; + + failMtdNameRef.set(formatFile); + + checkCause(GridTestUtils.assertThrows(log, () -> startGrid(0), IgniteCheckedException.class, null)); + } + + /** + * @throws Exception If failed. + */ + public void testFailureHandlerTriggeredFsync() throws Exception { + fsync = true; + + failFormatFileOnClusterActivate(); + } + + /** + * @throws Exception If failed. + */ + public void testFailureHandlerTriggered() throws Exception { + fsync = false; + + failFormatFileOnClusterActivate(); + } + + /** + * @throws Exception If failed. + */ + private void failFormatFileOnClusterActivate() throws Exception { + failMtdNameRef.set(null); + + startGrid(0); + startGrid(1); + + if (!fsync) { + setFileIOFactory(grid(0).context().cache().context().wal()); + setFileIOFactory(grid(1).context().cache().context().wal()); + } + + failMtdNameRef.set(formatFile); + + grid(0).cluster().active(true); + + checkCause(failureHandler(0).awaitFailure(2000).error()); + checkCause(failureHandler(1).awaitFailure(2000).error()); + } + + /** + * @param mtdName Method name. + */ + private static boolean isCalledFrom(String mtdName) { + return isCalledFrom(Thread.currentThread().getStackTrace(), mtdName); + } + + /** + * @param stackTrace Stack trace. + * @param mtdName Method name. + */ + private static boolean isCalledFrom(StackTraceElement[] stackTrace, String mtdName) { + return Arrays.stream(stackTrace).map(StackTraceElement::getMethodName).anyMatch(mtdName::equals); + } + + /** + * @param gridIdx Grid index. + * @return Failure handler configured for grid with given index. + */ + private TestFailureHandler failureHandler(int gridIdx) { + FailureHandler hnd = grid(gridIdx).configuration().getFailureHandler(); + + assertTrue(hnd instanceof TestFailureHandler); + + return (TestFailureHandler)hnd; + } + + /** + * @param t Throwable. + */ + private void checkCause(Throwable t) { + StorageException e = X.cause(t, StorageException.class); + + assertNotNull(e); + assertNotNull(e.getMessage()); + assertTrue(e.getMessage().contains("Failed to format WAL segment file")); + + IOException ioe = X.cause(e, IOException.class); + + assertNotNull(ioe); + assertNotNull(ioe.getMessage()); + assertTrue(ioe.getMessage().contains("No space left on device")); + + assertTrue(isCalledFrom(ioe.getStackTrace(), formatFile)); + } + + /** */ + private void setFileIOFactory(IgniteWriteAheadLogManager wal) { + if (wal instanceof FileWriteAheadLogManager) + ((FileWriteAheadLogManager)wal).setFileIOFactory(new FailingFileIOFactory(failMtdNameRef)); + else + fail(wal.getClass().toString()); + } + + /** + * Create File I/O which fails if specific method call present in stack trace. + */ + private static class FailingFileIOFactory implements FileIOFactory { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** Delegate factory. */ + private final FileIOFactory delegateFactory = new RandomAccessFileIOFactory(); + + /** Fail method name reference. */ + private final AtomicReference<String> failMtdNameRef; + + /** + * @param failMtdNameRef Fail method name reference. + */ + FailingFileIOFactory(AtomicReference<String> failMtdNameRef) { + assertNotNull(failMtdNameRef); + + this.failMtdNameRef = failMtdNameRef; + } + + /** {@inheritDoc} */ + @Override public FileIO create(File file) throws IOException { + return create(file, CREATE, READ, WRITE); + } + + /** {@inheritDoc} */ + @Override public FileIO create(File file, OpenOption... modes) throws IOException { + final FileIO delegate = delegateFactory.create(file, modes); + + return new FileIODecorator(delegate) { + @Override public int write(byte[] buf, int off, int len) throws IOException { + conditionalFail(); + + return super.write(buf, off, len); + } + + @Override public void clear() throws IOException { + conditionalFail(); + + super.clear(); + } + + private void conditionalFail() throws IOException { + String failMtdName = failMtdNameRef.get(); + + if (failMtdName != null && isCalledFrom(failMtdName)) + throw new IOException("No space left on device"); + } + }; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3fff8a85/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java index 6a4f9fe..3b46761 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java @@ -50,6 +50,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalF import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalFlushFsyncWithMmapBufferSelfTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalFlushLogOnlySelfTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalFlushLogOnlyWithMmapBufferSelfTest; +import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalFormatFileFailoverTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalHistoryReservationsTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalIteratorSwitchSegmentTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalSerializerVersionTest; @@ -150,6 +151,8 @@ public class IgnitePdsTestSuite2 extends TestSuite { suite.addTestSuite(IgniteWalFlushLogOnlyWithMmapBufferSelfTest.class); + suite.addTestSuite(IgniteWalFormatFileFailoverTest.class); + // Test suite uses Standalone WAL iterator to verify PDS content. suite.addTestSuite(IgniteWalReaderTest.class);