This is an automated email from the ASF dual-hosted git repository. agoncharuk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new c32c7ac IGNITE-11632 Correctly handle crc errors in no archiver mode - Fixes #6345. c32c7ac is described below commit c32c7acba204fa6b32616d3b725198f85df97b15 Author: mstepachev <maksim.stepac...@gmail.com> AuthorDate: Thu Apr 4 16:16:15 2019 +0300 IGNITE-11632 Correctly handle crc errors in no archiver mode - Fixes #6345. Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com> --- .../GridCacheDatabaseSharedManager.java | 84 +++++--- .../persistence/wal/FileWriteAheadLogManager.java | 70 +++++-- .../IgniteAbstractWalIteratorInvalidCrcTest.java | 53 +---- ...teWithoutArchiverWalIteratorInvalidCrcTest.java | 219 +++++++++++++++++++++ .../cache/persistence/db/wal/crc/WalTestUtils.java | 147 ++++++++++++++ .../ignite/testsuites/IgnitePdsTestSuite2.java | 2 + 6 files changed, 477 insertions(+), 98 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index cece907..1c1a611 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -40,7 +40,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Optional; +import java.util.NoSuchElementException; import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -1021,7 +1021,12 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan true ); - WALPointer restored = binaryState.lastReadRecordPointer().map(FileWALPointer::next).orElse(null); + WALPointer restored = binaryState.lastReadRecordPointer(); + + if(restored.equals(CheckpointStatus.NULL_PTR)) + restored = null; // This record is first + else + restored = restored.next(); if (restored == null && !status.endPtr.equals(CheckpointStatus.NULL_PTR)) { throw new StorageException("The memory cannot be restored. The critical part of WAL archive is missing " + @@ -2044,7 +2049,11 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan dumpPartitionsInfo(cctx, log); } - walTail = tailPointer(logicalState.lastReadRecordPointer().orElse(null)); + // We must return null for NULL_PTR record, because FileWriteAheadLogManager.resumeLogging + // can't write header without that condition. + WALPointer lastReadPointer = logicalState.lastReadRecordPointer(); + + walTail = tailPointer(lastReadPointer.equals(CheckpointStatus.NULL_PTR) ? null : lastReadPointer); cctx.wal().onDeActivate(kctx); } @@ -2171,30 +2180,38 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan boolean apply = status.needRestoreMemory(); - if (apply) { - if (finalizeState) - U.quietAndWarn(log, "Ignite node stopped in the middle of checkpoint. Will restore memory state and " + - "finish checkpoint on node start."); + try { + WALRecord startRec = !CheckpointStatus.NULL_PTR.equals(status.startPtr) || apply ? cctx.wal().read(status.startPtr) : null; - cctx.pageStore().beginRecover(); + if (apply) { + if (finalizeState) + U.quietAndWarn(log, "Ignite node stopped in the middle of checkpoint. Will restore memory state and " + + "finish checkpoint on node start."); - WALRecord rec = cctx.wal().read(status.startPtr); + cctx.pageStore().beginRecover(); - if (!(rec instanceof CheckpointRecord)) - throw new StorageException("Checkpoint marker doesn't point to checkpoint record " + - "[ptr=" + status.startPtr + ", rec=" + rec + "]"); + if (!(startRec instanceof CheckpointRecord)) + throw new StorageException("Checkpoint marker doesn't point to checkpoint record " + + "[ptr=" + status.startPtr + ", rec=" + startRec + "]"); - WALPointer cpMark = ((CheckpointRecord)rec).checkpointMark(); + WALPointer cpMark = ((CheckpointRecord)startRec).checkpointMark(); - if (cpMark != null) { - log.info("Restoring checkpoint after logical recovery, will start physical recovery from " + - "back pointer: " + cpMark); + if (cpMark != null) { + log.info("Restoring checkpoint after logical recovery, will start physical recovery from " + + "back pointer: " + cpMark); - recPtr = cpMark; + recPtr = cpMark; + } } + else + cctx.wal().notchLastCheckpointPtr(status.startPtr); + } + catch (NoSuchElementException e) { + throw new StorageException("Failed to read checkpoint record from WAL, persistence consistency " + + "cannot be guaranteed. Make sure configuration points to correct WAL folders and WAL folder is " + + "properly mounted [ptr=" + status.startPtr + ", walPath=" + persistenceCfg.getWalPath() + + ", walArchive=" + persistenceCfg.getWalArchivePath() + "]"); } - else - cctx.wal().notchLastCheckpointPtr(status.startPtr); AtomicReference<IgniteCheckedException> applyError = new AtomicReference<>(); @@ -2330,7 +2347,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan if (!finalizeState) return null; - FileWALPointer lastReadPtr = restoreBinaryState.lastReadRecordPointer().orElse(null); + FileWALPointer lastReadPtr = restoreBinaryState.lastReadRecordPointer(); if (status.needRestoreMemory()) { if (restoreBinaryState.needApplyBinaryUpdate()) @@ -2676,7 +2693,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan WALIterator it = cctx.wal().replay(status.startPtr, recordTypePredicate); - RestoreLogicalState restoreLogicalState = new RestoreLogicalState(it, lastArchivedSegment, cacheGroupsPredicate); + RestoreLogicalState restoreLogicalState = new RestoreLogicalState(status, it, lastArchivedSegment, cacheGroupsPredicate); try { while (it.hasNextX()) { @@ -5417,6 +5434,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** Last archived segment. */ protected final long lastArchivedSegment; + /** Checkpoint status. */ + protected final CheckpointStatus status; + /** WAL iterator. */ private final WALIterator iterator; @@ -5424,15 +5444,18 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan private final IgnitePredicate<Integer> cacheGroupPredicate; /** + * @param status Checkpoint status. * @param iterator WAL iterator. * @param lastArchivedSegment Last archived segment index. * @param cacheGroupPredicate Cache groups predicate. */ protected RestoreStateContext( + CheckpointStatus status, WALIterator iterator, long lastArchivedSegment, IgnitePredicate<Integer> cacheGroupPredicate ) { + this.status = status; this.iterator = iterator; this.lastArchivedSegment = lastArchivedSegment; this.cacheGroupPredicate = cacheGroupPredicate; @@ -5514,8 +5537,12 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan * * @return Last read WAL record pointer. */ - public Optional<FileWALPointer> lastReadRecordPointer() { - return iterator.lastRead().map(ptr -> (FileWALPointer)ptr); + public FileWALPointer lastReadRecordPointer() { + assert status.startPtr != null && status.startPtr instanceof FileWALPointer; + + return iterator.lastRead() + .map(ptr -> (FileWALPointer)ptr) + .orElseGet(() -> (FileWALPointer)status.startPtr); } /** @@ -5523,7 +5550,7 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan * @return Flag indicates need throws CRC exception or not. */ public boolean throwsCRCError() { - return lastReadRecordPointer().filter(ptr -> ptr.index() <= lastArchivedSegment).isPresent(); + return lastReadRecordPointer().index() <= lastArchivedSegment; } } @@ -5531,8 +5558,6 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan * Restore memory context. Tracks the safety of binary recovery. */ public class RestoreBinaryState extends RestoreStateContext { - /** Checkpoint status. */ - private final CheckpointStatus status; /** The flag indicates need to apply the binary update or no needed. */ private boolean needApplyBinaryUpdates; @@ -5549,9 +5574,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan long lastArchivedSegment, IgnitePredicate<Integer> cacheGroupsPredicate ) { - super(iterator, lastArchivedSegment, cacheGroupsPredicate); + super(status, iterator, lastArchivedSegment, cacheGroupsPredicate); - this.status = status; this.needApplyBinaryUpdates = status.needRestoreMemory(); } @@ -5618,8 +5642,8 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan /** * @param lastArchivedSegment Last archived segment index. */ - public RestoreLogicalState(WALIterator iterator, long lastArchivedSegment, IgnitePredicate<Integer> cacheGroupsPredicate) { - super(iterator, lastArchivedSegment, cacheGroupsPredicate); + public RestoreLogicalState(CheckpointStatus status, WALIterator iterator, long lastArchivedSegment, IgnitePredicate<Integer> cacheGroupsPredicate) { + super(status, iterator, lastArchivedSegment, cacheGroupsPredicate); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index 180bd56..c6cdf7c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -2696,30 +2696,19 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (end == null) { long nextWalSegmentIdx = curWalSegmIdx + 1; + if (!isArchiverEnabled()) + if (canIgnoreCrcError(nextWalSegmentIdx, nextWalSegmentIdx, e, ptr)) + return null; + // Check that we should not look this segment up in archive directory. // Basically the same check as in "advanceSegment" method. - if (archiver != null) + if (isArchiverEnabled() && archiver != null) if (!canReadArchiveOrReserveWork(nextWalSegmentIdx)) try { long workIdx = nextWalSegmentIdx % dsCfg.getWalSegments(); - FileDescriptor fd = new FileDescriptor( - new File(walWorkDir, FileDescriptor.fileName(workIdx)), - nextWalSegmentIdx - ); - - try { - ReadFileHandle nextHandle = initReadHandle(fd, null); - - // "nextHandle == null" is true only if current segment is the last one in the - // whole history. Only in such case we ignore crc validation error and just stop - // as if we reached the end of the WAL. - if (nextHandle == null) - return null; - } - catch (IgniteCheckedException | FileNotFoundException initReadHandleException) { - e.addSuppressed(initReadHandleException); - } + if (canIgnoreCrcError(workIdx, nextWalSegmentIdx, e, ptr)) + return null; } finally { releaseWorkSegment(nextWalSegmentIdx); @@ -2747,6 +2736,51 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl archiver.releaseWorkSegment(absIdx); } + /** + * Check that archiver is enabled + */ + private boolean isArchiverEnabled() { + if (walArchiveDir != null && walWorkDir != null) + return !walArchiveDir.equals(walWorkDir); + + return !new File(dsCfg.getWalArchivePath()).equals(new File(dsCfg.getWalPath())); + } + + /** + * @param workIdx Work index. + * @param walSegmentIdx Wal segment index. + * @param e Exception. + * @param ptr Ptr. + */ + private boolean canIgnoreCrcError( + long workIdx, + long walSegmentIdx, + @NotNull Exception e, + @Nullable FileWALPointer ptr) { + FileDescriptor fd = new FileDescriptor( + new File(walWorkDir, FileDescriptor.fileName(workIdx)), + walSegmentIdx + ); + + try { + if (!fd.file().exists()) + return true; + + ReadFileHandle nextHandle = initReadHandle(fd, ptr); + + // "nextHandle == null" is true only if current segment is the last one in the + // whole history. Only in such case we ignore crc validation error and just stop + // as if we reached the end of the WAL. + if (nextHandle == null) + return true; + } + catch (IgniteCheckedException | FileNotFoundException initReadHandleException) { + e.addSuppressed(initReadHandleException); + } + + return false; + } + /** {@inheritDoc} */ @Override protected AbstractReadFileHandle createReadFileHandle(SegmentIO fileIO, RecordSerializer ser, FileInput in) { diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteAbstractWalIteratorInvalidCrcTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteAbstractWalIteratorInvalidCrcTest.java index 4576129..2c3f0a5 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteAbstractWalIteratorInvalidCrcTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteAbstractWalIteratorInvalidCrcTest.java @@ -19,8 +19,6 @@ package org.apache.ignite.internal.processors.cache.persistence.db.wal.crc; import java.io.File; import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.function.BiFunction; @@ -37,9 +35,6 @@ import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; import org.apache.ignite.internal.pagemem.wal.WALIterator; import org.apache.ignite.internal.pagemem.wal.WALPointer; import org.apache.ignite.internal.pagemem.wal.record.WALRecord; -import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; -import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; -import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor; import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer; import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException; @@ -51,10 +46,6 @@ import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.jetbrains.annotations.NotNull; import org.junit.Test; -import static java.nio.ByteBuffer.allocate; -import static java.nio.file.StandardOpenOption.WRITE; -import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.CRC_SIZE; - /** * */ @@ -200,9 +191,10 @@ public abstract class IgniteAbstractWalIteratorInvalidCrcTest extends GridCommon FileDescriptor corruptedDesc = descPicker.apply(archiveDescs, descs); - FileWALPointer beforeCorruptedPtr = corruptWalSegmentFile( + FileWALPointer beforeCorruptedPtr = WalTestUtils.corruptWalSegmentFile( corruptedDesc, - iterFactory + iterFactory, + random ); if (shouldFail) { @@ -234,43 +226,4 @@ public abstract class IgniteAbstractWalIteratorInvalidCrcTest extends GridCommon iter.next(); } } - - /** - * Put zero CRC in one of records for the specified segment. - * @param desc WAL segment descriptor. - * @param iterFactory Iterator factory for segment iterating. - * @return Descriptor that is located strictly before the corrupted one. - * @throws IOException If IO exception. - * @throws IgniteCheckedException If iterator failed. - */ - protected FileWALPointer corruptWalSegmentFile( - FileDescriptor desc, - IgniteWalIteratorFactory iterFactory - ) throws IOException, IgniteCheckedException { - List<FileWALPointer> pointers = new ArrayList<>(); - - try (WALIterator it = iterFactory.iterator(desc.file())) { - for (IgniteBiTuple<WALPointer, WALRecord> tuple : it) { - pointers.add((FileWALPointer) tuple.get1()); - } - } - - // Should have a previous record to return and another value before that to ensure that "lastReadPtr" - // in "doTest" will always exist. - int idxCorrupted = 2 + random.nextInt(pointers.size() - 2); - - FileWALPointer pointer = pointers.get(idxCorrupted); - int crc32Off = pointer.fileOffset() + pointer.length() - CRC_SIZE; - - ByteBuffer zeroCrc32 = allocate(CRC_SIZE); // Has 0 value by default. - - FileIOFactory ioFactory = new RandomAccessFileIOFactory(); - try (FileIO io = ioFactory.create(desc.file(), WRITE)) { - io.write(zeroCrc32, crc32Off); - - io.force(true); - } - - return pointers.get(idxCorrupted - 1); - } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteWithoutArchiverWalIteratorInvalidCrcTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteWithoutArchiverWalIteratorInvalidCrcTest.java new file mode 100644 index 0000000..509bdd4 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteWithoutArchiverWalIteratorInvalidCrcTest.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.db.wal.crc; + +import java.io.File; +import java.util.List; +import java.util.Random; +import java.util.stream.Collectors; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer; +import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.WithSystemProperty; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_WAL_PATH; +import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordPurpose.LOGICAL; +import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordPurpose.PHYSICAL; +import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CHECKPOINT_RECORD; + +/** */ +public class IgniteWithoutArchiverWalIteratorInvalidCrcTest extends GridCommonAbstractTest { + /** Size of inserting dummy value. */ + private static final int VALUE_SIZE = 4 * 1024; + + /** Size of WAL segment file. */ + private static final int WAL_SEGMENT_SIZE = 1024 * 1024; + + /** Count of WAL segment files in working directory. */ + private static final int WAL_SEGMENTS = DataStorageConfiguration.DFLT_WAL_SEGMENTS; + + /** Ignite instance. */ + protected IgniteEx ignite; + + /** Random instance for utility purposes. */ + protected Random random = new Random(); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setDataStorageConfiguration( + new DataStorageConfiguration() + .setWalSegmentSize(WAL_SEGMENT_SIZE) + .setWalMode(WALMode.LOG_ONLY) + .setWalArchivePath(DFLT_WAL_PATH) // disable archiving + .setDefaultDataRegionConfiguration( + new DataRegionConfiguration() + .setPersistenceEnabled(true) + ) + ); + + cfg.setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME)); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopGrid(0); + + cleanPersistenceDir(); + } + + /** + * A logical record was corrupted or just doesn't exist because the end of wal is reached, after start checkpoint without end. + * -----||------||----X----> OR ----X-----> + * We recover all before it, and start the node. + */ + @Test + @WithSystemProperty(key = GridCacheDatabaseSharedManager.IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP, value = "true") + public void nodeShouldStartIfLogicalRecordCorruptedAfterCheckpointOrWalStart() throws Exception { + startNodeAndPopulate(); + + stopGrid(0); + + IgniteWriteAheadLogManager walMgr = ignite.context().cache().context().wal(); + + File walDir = U.field(walMgr, "walWorkDir"); + + IgniteWalIteratorFactory iterFactory = new IgniteWalIteratorFactory(); + + List<FileDescriptor> walFiles = getWalFiles(walDir, iterFactory); + + FileDescriptor lastWalFile = walFiles.get(walFiles.size() - 1); + + List<FileWALPointer> pointers = WalTestUtils.getPointers(lastWalFile, iterFactory, LOGICAL); + + WalTestUtils.corruptWalSegmentFile(lastWalFile, pointers.get(pointers.size()-1)); + + IgniteEx ex = startGrid(0); + + ex.cluster().active(true); + } + + /** + * Binary record was corrupted, before start last checkpoint without end. + * -----||--X---||---------> + * Node can't start. + */ + @Test + public void nodeShouldStartIfBinaryRecordCorruptedBeforeEndCheckpoint() throws Exception { + startNodeAndPopulate(); + + stopGrid(0, true); + + IgniteWriteAheadLogManager walMgr = ignite.context().cache().context().wal(); + + File walDir = U.field(walMgr, "walWorkDir"); + + IgniteWalIteratorFactory iterFactory = new IgniteWalIteratorFactory(); + + List<FileDescriptor> walFiles = getWalFiles(walDir, iterFactory); + + FileDescriptor lastWalFile = walFiles.get(walFiles.size() - 1); + + List<FileWALPointer> checkpoints = WalTestUtils.getPointers(lastWalFile, iterFactory, CHECKPOINT_RECORD); + + List<FileWALPointer> binary = WalTestUtils.getPointers(lastWalFile, iterFactory, PHYSICAL).stream() + .filter(p -> p.fileOffset() < checkpoints.get(checkpoints.size() - 1).fileOffset()) + .collect(Collectors.toList()); + + FileWALPointer pointer = binary.get(binary.size() - 1); + + WalTestUtils.corruptWalSegmentFile(lastWalFile, pointer); + + GridTestUtils.assertThrows(log, () -> startGrid(0), Exception.class, null); + } + + /** + * Last start checkpoint record was corrupted. + * -----||------|X|--------> + * We stop the node. + */ + @Test + public void nodeShouldNotStartIfLastCheckpointRecordCorrupted() throws Exception { + startNodeAndPopulate(); + + stopGrid(0, true); + + IgniteWriteAheadLogManager walMgr = ignite.context().cache().context().wal(); + + File walDir = U.field(walMgr, "walWorkDir"); + + IgniteWalIteratorFactory iterFactory = new IgniteWalIteratorFactory(); + + List<FileDescriptor> walFiles = getWalFiles(walDir, iterFactory); + + Random corruptLastRecord = null; + + FileDescriptor lastWalFile = walFiles.get(walFiles.size() - 1); + + WalTestUtils.corruptWalSegmentFile(lastWalFile, iterFactory, corruptLastRecord); + + GridTestUtils.assertThrows(log, () -> startGrid(0), Exception.class, null); + } + + /** */ + private void startNodeAndPopulate() throws Exception { + ignite = startGrid(0); + + ignite.cluster().active(true); + + IgniteCache<Integer, byte[]> cache = ignite.cache(DEFAULT_CACHE_NAME); + + byte[] val = new byte[VALUE_SIZE]; + + // Fill value with random data. + random.nextBytes(val); + + // Amount of values that's enough to fill working dir at least twice. + int insertingCnt = 2 * WAL_SEGMENT_SIZE * WAL_SEGMENTS / VALUE_SIZE; + for (int i = 0; i < insertingCnt; i++) + cache.put(i, val); + } + + /** + * @param walDir Wal directory. + * @param iterFactory Iterator factory. + * @return Last wal segment + */ + private List<FileDescriptor> getWalFiles(File walDir, IgniteWalIteratorFactory iterFactory) { + return iterFactory.resolveWalFiles( + new IgniteWalIteratorFactory.IteratorParametersBuilder() + .filesOrDirs(walDir) + ); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/WalTestUtils.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/WalTestUtils.java new file mode 100644 index 0000000..ce196d2 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/WalTestUtils.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.db.wal.crc; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.pagemem.wal.WALIterator; +import org.apache.ignite.internal.pagemem.wal.WALPointer; +import org.apache.ignite.internal.pagemem.wal.record.WALRecord; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer; +import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory; +import org.apache.ignite.lang.IgniteBiTuple; +import org.jetbrains.annotations.Nullable; + +import static java.nio.ByteBuffer.allocate; +import static java.nio.file.StandardOpenOption.WRITE; +import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.CRC_SIZE; + +/** + * Utility class for WAL testing. + */ +public class WalTestUtils { + /** + * Put zero CRC in one of records for the specified segment. + * + * @param desc WAL segment descriptor. + * @param iterFactory Iterator factory for segment iterating. + * @param random Random generator, If it is null, returns a last element position. + * @return Descriptor that is located strictly before the corrupted one. + * @throws IOException If IO exception. + * @throws IgniteCheckedException If iterator failed. + */ + public static FileWALPointer corruptWalSegmentFile( + FileDescriptor desc, + IgniteWalIteratorFactory iterFactory, + @Nullable Random random + ) throws IOException, IgniteCheckedException { + List<FileWALPointer> pointers = new ArrayList<>(); + + try (WALIterator it = iterFactory.iterator(desc.file())) { + for (IgniteBiTuple<WALPointer, WALRecord> tuple : it) + pointers.add((FileWALPointer)tuple.get1()); + } + + // Should have a previous record to return and another value before that to ensure that "lastReadPtr" + // in a test will always exist. + int idxCorrupted = random != null ? 2 + random.nextInt(pointers.size() - 2) : pointers.size() - 1; + + FileWALPointer pointer = pointers.get(idxCorrupted); + + corruptWalSegmentFile(desc, pointer); + + return pointers.get(idxCorrupted - 1); + } + + /** + * Put zero CRC in one of records for the specified segment. + * + * @param desc WAL segment descriptor. + * @param pointer WAL pointer. + */ + public static void corruptWalSegmentFile( + FileDescriptor desc, + FileWALPointer pointer + ) throws IOException { + + int crc32Off = pointer.fileOffset() + pointer.length() - CRC_SIZE; + + ByteBuffer zeroCrc32 = allocate(CRC_SIZE); // Has 0 value by default. + + FileIOFactory ioFactory = new RandomAccessFileIOFactory(); + try (FileIO io = ioFactory.create(desc.file(), WRITE)) { + io.write(zeroCrc32, crc32Off); + + io.force(true); + } + } + + /** + * @param desc Wal segment. + * @param iterFactory Iterator factory. + * @param recordType filter by RecordType + * @return List of pointers. + */ + public static List<FileWALPointer> getPointers( + FileDescriptor desc, + IgniteWalIteratorFactory iterFactory, + WALRecord.RecordType recordType + ) throws IgniteCheckedException { + List<FileWALPointer> cpPointers = new ArrayList<>(); + + try (WALIterator it = iterFactory.iterator(desc.file())) { + for (IgniteBiTuple<WALPointer, WALRecord> tuple : it) { + if (recordType.equals(tuple.get2().type())) + cpPointers.add((FileWALPointer)tuple.get1()); + } + } + + return cpPointers; + } + + /** + * @param desc Wal segment. + * @param iterFactory Iterator factory. + * @param recordPurpose Filter by RecordPurpose + * @return List of pointers. + */ + public static List<FileWALPointer> getPointers( + FileDescriptor desc, + IgniteWalIteratorFactory iterFactory, + WALRecord.RecordPurpose recordPurpose + ) throws IgniteCheckedException { + List<FileWALPointer> cpPointers = new ArrayList<>(); + + try (WALIterator it = iterFactory.iterator(desc.file())) { + for (IgniteBiTuple<WALPointer, WALRecord> tuple : it) { + if (recordPurpose.equals(tuple.get2().type().purpose())) + cpPointers.add((FileWALPointer)tuple.get1()); + } + } + + return cpPointers; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java index 00175e4..8f7be9a 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java @@ -72,6 +72,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.wal.crc.Ignite import org.apache.ignite.internal.processors.cache.persistence.db.wal.crc.IgnitePureJavaCrcCompatibility; import org.apache.ignite.internal.processors.cache.persistence.db.wal.crc.IgniteReplayWalIteratorInvalidCrcTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.crc.IgniteStandaloneWalIteratorInvalidCrcTest; +import org.apache.ignite.internal.processors.cache.persistence.db.wal.crc.IgniteWithoutArchiverWalIteratorInvalidCrcTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.reader.IgniteWalReaderTest; import org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneWalRecordsIteratorTest; import org.apache.ignite.testframework.GridTestUtils; @@ -101,6 +102,7 @@ public class IgnitePdsTestSuite2 { GridTestUtils.addTestIfNeeded(suite, IgniteReplayWalIteratorInvalidCrcTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, IgniteFsyncReplayWalIteratorInvalidCrcTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, IgnitePureJavaCrcCompatibility.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, IgniteWithoutArchiverWalIteratorInvalidCrcTest.class, ignoredTests); addRealPageStoreTests(suite, ignoredTests);