IGNITE-9441 Improved handling of invalid CRC for WALIterator, in the end or in 
middle of WAL - Fixes #4714.

Signed-off-by: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/e153114d
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/e153114d
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/e153114d

Branch: refs/heads/ignite-7251
Commit: e153114d4312b9e4def3e8e007720a99cabe6a76
Parents: 5fcd17e
Author: ibessonov <bessonov...@gmail.com>
Authored: Tue Sep 18 12:44:06 2018 +0300
Committer: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com>
Committed: Tue Sep 18 12:44:06 2018 +0300

----------------------------------------------------------------------
 .../wal/AbstractWalRecordsIterator.java         |   6 +-
 .../wal/FileWriteAheadLogManager.java           |  46 +++
 .../wal/FsyncModeFileWriteAheadLogManager.java  |  48 +++-
 .../wal/reader/IgniteWalIteratorFactory.java    |  10 +-
 .../reader/StandaloneWalRecordsIterator.java    |  20 ++
 ...IgniteAbstractWalIteratorInvalidCrcTest.java | 279 +++++++++++++++++++
 ...iteFsyncReplayWalIteratorInvalidCrcTest.java |  31 +++
 .../IgniteReplayWalIteratorInvalidCrcTest.java  |  54 ++++
 ...niteStandaloneWalIteratorInvalidCrcTest.java |  50 ++++
 .../ignite/testsuites/IgnitePdsTestSuite2.java  |   6 +
 10 files changed, 545 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/e153114d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
index 0b704ca..aa8eb31 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/AbstractWalRecordsIterator.java
@@ -280,8 +280,10 @@ public abstract class AbstractWalRecordsIterator
      * @return {@code null} if the error was handled and we can go ahead, 
{@code IgniteCheckedException} if the error
      * was not handled, and we should stop the iteration.
      */
-    protected IgniteCheckedException handleRecordException(@NotNull final 
Exception e,
-        @Nullable final FileWALPointer ptr) {
+    protected IgniteCheckedException handleRecordException(
+        @NotNull final Exception e,
+        @Nullable final FileWALPointer ptr
+    ) {
         if (log.isInfoEnabled())
             log.info("Stopping WAL iteration due to an exception: " + 
e.getMessage() + ", ptr=" + ptr);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/e153114d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 634cab3..907a311 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -95,6 +95,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactor
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
 import 
org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
@@ -111,6 +112,7 @@ import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CIX1;
 import org.apache.ignite.internal.util.typedef.CO;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
 import org.apache.ignite.lang.IgniteBiTuple;
@@ -3202,6 +3204,50 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
             return nextHandle;
         }
 
+        /** {@inheritDoc} */
+        @Override protected IgniteCheckedException handleRecordException(
+            @NotNull Exception e,
+            @Nullable FileWALPointer ptr) {
+
+            if (e instanceof IgniteCheckedException)
+                if (X.hasCause(e, IgniteDataIntegrityViolationException.class))
+                    // This means that there is no explicit last sengment, so 
we iterate unil the very end.
+                    if (end == null) {
+                        long nextWalSegmentIdx = curWalSegmIdx + 1;
+
+                        // Check that we should not look this segment up in 
archive directory.
+                        // Basically the same check as in "advanceSegment" 
method.
+                        if (archiver != null)
+                            if 
(!canReadArchiveOrReserveWork(nextWalSegmentIdx))
+                                try {
+                                    long workIdx = nextWalSegmentIdx % 
dsCfg.getWalSegments();
+
+                                    FileDescriptor fd = new FileDescriptor(
+                                        new File(walWorkDir, 
FileDescriptor.fileName(workIdx)),
+                                        nextWalSegmentIdx
+                                    );
+
+                                    try {
+                                        ReadFileHandle nextHandle = 
initReadHandle(fd, null);
+
+                                        // "nextHandle == null" is true only 
if current segment is the last one in the
+                                        // whole history. Only in such case we 
ignore crc validation error and just stop
+                                        // as if we reached the end of the WAL.
+                                        if (nextHandle == null)
+                                            return null;
+                                    }
+                                    catch (IgniteCheckedException | 
FileNotFoundException initReadHandleException) {
+                                        
e.addSuppressed(initReadHandleException);
+                                    }
+                                }
+                                finally {
+                                    releaseWorkSegment(nextWalSegmentIdx);
+                                }
+                    }
+
+            return super.handleRecordException(e, ptr);
+        }
+
         /**
          * @param absIdx Absolute index to check.
          * @return <ul><li> {@code True} if we can safely read the archive,  
</li> <li>{@code false} if the segment has

http://git-wip-us.apache.org/repos/asf/ignite/blob/e153114d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
index 0f15d49..6a816a5 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FsyncModeFileWriteAheadLogManager.java
@@ -74,7 +74,6 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import 
org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
-import 
org.apache.ignite.internal.processors.cache.persistence.StorageException;
 import org.apache.ignite.internal.pagemem.wal.WALIterator;
 import org.apache.ignite.internal.pagemem.wal.WALPointer;
 import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord;
@@ -85,10 +84,12 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter
 import 
org.apache.ignite.internal.processors.cache.WalStateManager.WALDisableContext;
 import 
org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
 import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.StorageException;
 import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
 import 
org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
@@ -104,6 +105,7 @@ import org.apache.ignite.internal.util.typedef.CI1;
 import org.apache.ignite.internal.util.typedef.CIX1;
 import org.apache.ignite.internal.util.typedef.CO;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.util.worker.GridWorker;
@@ -3287,6 +3289,50 @@ public class FsyncModeFileWriteAheadLogManager extends 
GridCacheSharedManagerAda
             return nextHandle;
         }
 
+        /** {@inheritDoc} */
+        @Override protected IgniteCheckedException handleRecordException(
+            @NotNull Exception e,
+            @Nullable FileWALPointer ptr) {
+
+            if (e instanceof IgniteCheckedException)
+                if (X.hasCause(e, IgniteDataIntegrityViolationException.class))
+                    // This means that there is no explicit last sengment, so 
we iterate unil the very end.
+                    if (end == null) {
+                        long nextWalSegmentIdx = curWalSegmIdx + 1;
+
+                        // Check that we should not look this segment up in 
archive directory.
+                        // Basically the same check as in "advanceSegment" 
method.
+                        if (archiver != null)
+                            if 
(!canReadArchiveOrReserveWork(nextWalSegmentIdx))
+                                try {
+                                    long workIdx = nextWalSegmentIdx % 
dsCfg.getWalSegments();
+
+                                    FileDescriptor fd = new FileDescriptor(
+                                        new File(walWorkDir, 
FileDescriptor.fileName(workIdx)),
+                                        nextWalSegmentIdx
+                                    );
+
+                                    try {
+                                        ReadFileHandle nextHandle = 
initReadHandle(fd, null);
+
+                                        // "nextHandle == null" is true only 
if current segment is the last one in the
+                                        // whole history. Only in such case we 
ignore crc validation error and just stop
+                                        // as if we reached the end of the WAL.
+                                        if (nextHandle == null)
+                                            return null;
+                                    }
+                                    catch (IgniteCheckedException | 
FileNotFoundException initReadHandleException) {
+                                        
e.addSuppressed(initReadHandleException);
+                                    }
+                                }
+                                finally {
+                                    releaseWorkSegment(nextWalSegmentIdx);
+                                }
+                    }
+
+            return super.handleRecordException(e, ptr);
+        }
+
         /**
          * @param absIdx Absolute index to check.
          * @return <ul><li> {@code True} if we can safely read the archive,  
</li> <li>{@code false} if the segment has

http://git-wip-us.apache.org/repos/asf/ignite/blob/e153114d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
index 0e61fae..aaff33a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/IgniteWalIteratorFactory.java
@@ -373,6 +373,12 @@ public class IgniteWalIteratorFactory {
      */
     public static class IteratorParametersBuilder {
         /** */
+        public static final FileWALPointer DFLT_LOW_BOUND = new 
FileWALPointer(Long.MIN_VALUE, 0, 0);
+
+        /** */
+        public static final FileWALPointer DFLT_HIGH_BOUND = new 
FileWALPointer(Long.MAX_VALUE, Integer.MAX_VALUE, 0);
+
+        /** */
         private File[] filesOrDirs;
 
         /** */
@@ -404,10 +410,10 @@ public class IgniteWalIteratorFactory {
         @Nullable private IgniteBiPredicate<RecordType, WALPointer> filter;
 
         /** */
-        private FileWALPointer lowBound = new FileWALPointer(Long.MIN_VALUE, 
0, 0);
+        private FileWALPointer lowBound = DFLT_LOW_BOUND;
 
         /** */
-        private FileWALPointer highBound = new FileWALPointer(Long.MAX_VALUE, 
Integer.MAX_VALUE, 0);
+        private FileWALPointer highBound = DFLT_HIGH_BOUND;
 
         /**
          * @param filesOrDirs Paths to files or directories.

http://git-wip-us.apache.org/repos/asf/ignite/blob/e153114d/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
index a222877..7cfb66d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneWalRecordsIterator.java
@@ -46,17 +46,20 @@ import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileInput;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.ReadFileHandle;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.WalSegmentTailReachedException;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.SegmentHeader;
 import 
org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor;
 import org.apache.ignite.internal.util.typedef.T2;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 
+import static 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory.IteratorParametersBuilder.DFLT_HIGH_BOUND;
 import static 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.readSegmentHeader;
 
 /**
@@ -303,6 +306,23 @@ class StandaloneWalRecordsIterator extends 
AbstractWalRecordsIterator {
         return super.postProcessRecord(rec);
     }
 
+    /** {@inheritDoc} */
+    @Override protected IgniteCheckedException handleRecordException(
+        @NotNull Exception e,
+        @Nullable FileWALPointer ptr
+    ) {
+        if (e instanceof IgniteCheckedException)
+            if (X.hasCause(e, IgniteDataIntegrityViolationException.class))
+                // "curIdx" is an index in walFileDescriptors list.
+                if (curIdx == walFileDescriptors.size() - 1)
+                    // This means that there is no explicit last sengment, so 
we stop as if we reached the end
+                    // of the WAL.
+                    if (highBound.equals(DFLT_HIGH_BOUND))
+                        return null;
+
+        return super.handleRecordException(e, ptr);
+    }
+
     /**
      * Performs post processing of lazy data record, converts it to unwrap 
record.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/e153114d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteAbstractWalIteratorInvalidCrcTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteAbstractWalIteratorInvalidCrcTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteAbstractWalIteratorInvalidCrcTest.java
new file mode 100644
index 0000000..0b53bb8
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteAbstractWalIteratorInvalidCrcTest.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db.wal.crc;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.function.BiFunction;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.apache.ignite.internal.pagemem.wal.WALPointer;
+import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.NotNull;
+
+import static java.nio.ByteBuffer.allocate;
+import static java.nio.file.StandardOpenOption.WRITE;
+import static 
org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.CRC_SIZE;
+
+/**
+ *
+ */
+public abstract class IgniteAbstractWalIteratorInvalidCrcTest extends 
GridCommonAbstractTest {
+    /** IP finder. */
+    private static final TcpDiscoveryVmIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** Size of inserting dummy value. */
+    private static final int VALUE_SIZE = 4 * 1024;
+
+    /** Size of WAL segment file. */
+    private static final int WAL_SEGMENT_SIZE = 1024 * 1024;
+
+    /** Count of WAL segment files in working directory. */
+    private static final int WAL_SEGMENTS = 
DataStorageConfiguration.DFLT_WAL_SEGMENTS;
+
+    /** Ignite instance. */
+    protected IgniteEx ignite;
+
+    /** Random instance for utility purposes. */
+    protected Random random = new Random();
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
+
+        cfg.setDataStorageConfiguration(
+            new DataStorageConfiguration()
+                .setWalSegmentSize(WAL_SEGMENT_SIZE)
+                .setWalMode(getWalMode())
+                .setDefaultDataRegionConfiguration(
+                    new DataRegionConfiguration()
+                        .setPersistenceEnabled(true)
+                )
+        );
+
+        cfg.setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME));
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        cleanPersistenceDir();
+
+        ignite = (IgniteEx)startGrid();
+
+        ignite.cluster().active(true);
+
+        IgniteCache<Integer, byte[]> cache = ignite.cache(DEFAULT_CACHE_NAME);
+
+        byte[] val = new byte[VALUE_SIZE];
+
+        // Fill value with random data.
+        random.nextBytes(val);
+
+        // Amount of values that's enough to fill working dir at least twice.
+        int insertingCnt = 2 * WAL_SEGMENT_SIZE * WAL_SEGMENTS / VALUE_SIZE;
+        for (int i = 0; i < insertingCnt; i++)
+            cache.put(i, val);
+
+        ignite.cluster().active(false);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopGrid();
+
+        cleanPersistenceDir();
+    }
+
+    /**
+     * @return WAL mode that will be used in {@link IgniteConfiguration}.
+     */
+    @NotNull protected abstract WALMode getWalMode();
+
+    /**
+     * Instantiate WAL iterator according to the iterator type of specific 
implementation.
+     * @param walMgr WAL manager instance.
+     * @param ignoreArchiveDir Do not include archive segments in resulting 
iterator if this flag is true.
+     * @return WAL iterator instance.
+     * @throws IgniteCheckedException If iterator creation failed for some 
reason.
+     */
+    @NotNull protected abstract WALIterator getWalIterator(
+        IgniteWriteAheadLogManager walMgr,
+        boolean ignoreArchiveDir
+    ) throws IgniteCheckedException;
+
+    /**
+     * Test that iteration fails if one of archive segments contains record 
with invalid CRC.
+     * @throws Exception If failed.
+     */
+    public void testArchiveCorruptedPtr() throws Exception {
+        doTest((archiveDescs, descs) -> 
archiveDescs.get(random.nextInt(archiveDescs.size())), false, true);
+    }
+
+    /**
+     * Test that iteration fails if one of segments in working directory 
contains record with invalid CRC
+     * and it is not the tail segment.
+     * @throws Exception If failed.
+     */
+    public void testNotTailCorruptedPtr() throws Exception {
+        doTest((archiveDescs, descs) -> descs.get(random.nextInt(descs.size() 
- 1)), true, true);
+    }
+
+
+    /**
+     * Test that iteration does not fail if tail segment in working directory 
contains record with invalid CRC.
+     * @throws Exception If failed.
+     */
+    public void testTailCorruptedPtr() throws Exception {
+        doTest((archiveDescs, descs) -> descs.get(descs.size() - 1), false, 
false);
+    }
+
+    /**
+     * @param descPicker Function that picks WAL segment to corrupt from 
archive segments list
+     *      and working directory segments list.
+     * @param ignoreArchiveDir Do not iterate over archive segments if this 
flag is true.
+     * @param shouldFail Whether iteration is axpected to fail or not.
+     * @throws IOException If IO exception.
+     * @throws IgniteCheckedException If iterator failed.
+     */
+    protected void doTest(
+        BiFunction<List<FileDescriptor>, List<FileDescriptor>, FileDescriptor> 
descPicker,
+        boolean ignoreArchiveDir,
+        boolean shouldFail
+    ) throws IOException, IgniteCheckedException {
+        IgniteWriteAheadLogManager walMgr = 
ignite.context().cache().context().wal();
+
+        IgniteWalIteratorFactory iterFactory = new IgniteWalIteratorFactory();
+
+        File walArchiveDir = U.field(walMgr, "walArchiveDir");
+        List<FileDescriptor> archiveDescs = iterFactory.resolveWalFiles(
+            new IgniteWalIteratorFactory.IteratorParametersBuilder()
+                .filesOrDirs(walArchiveDir)
+        );
+
+        File walDir = U.field(walMgr, "walWorkDir");
+        List<FileDescriptor> descs = iterFactory.resolveWalFiles(
+            new IgniteWalIteratorFactory.IteratorParametersBuilder()
+                .filesOrDirs(walDir)
+        );
+
+        FileDescriptor corruptedDesc = descPicker.apply(archiveDescs, descs);
+
+        FileWALPointer beforeCorruptedPtr = corruptWalSegmentFile(
+            corruptedDesc,
+            iterFactory
+        );
+
+        if (shouldFail) {
+            FileWALPointer[] lastReadPtrRef = new FileWALPointer[1];
+
+            IgniteException igniteException = (IgniteException) 
GridTestUtils.assertThrows(log, () -> {
+                try (WALIterator iter = getWalIterator(walMgr, 
ignoreArchiveDir)) {
+                    for (IgniteBiTuple<WALPointer, WALRecord> tuple : iter) {
+                        FileWALPointer ptr = (FileWALPointer)tuple.get1();
+                        lastReadPtrRef[0] = ptr;
+                    }
+                }
+
+                return null;
+            }, IgniteException.class, "Failed to read WAL record");
+
+            
assertTrue(igniteException.hasCause(IgniteDataIntegrityViolationException.class));
+
+            FileWALPointer lastReadPtr = lastReadPtrRef[0];
+            assertNotNull(lastReadPtr);
+
+            // WAL iterator advances to the next record and only then returns 
current one,
+            // so next record has to be valid as well.
+            assertEquals(lastReadPtr.next(), beforeCorruptedPtr);
+        }
+        else
+            try (WALIterator iter = getWalIterator(walMgr, ignoreArchiveDir)) {
+                while (iter.hasNext())
+                    iter.next();
+            }
+    }
+
+    /**
+     * Put zero CRC in one of records for the specified segment.
+     * @param desc WAL segment descriptor.
+     * @param iterFactory Iterator factory for segment iterating.
+     * @return Descriptor that is located strictly before the corrupted one.
+     * @throws IOException If IO exception.
+     * @throws IgniteCheckedException If iterator failed.
+     */
+    protected FileWALPointer corruptWalSegmentFile(
+        FileDescriptor desc,
+        IgniteWalIteratorFactory iterFactory
+    ) throws IOException, IgniteCheckedException {
+        List<FileWALPointer> pointers = new ArrayList<>();
+
+        try (WALIterator it = iterFactory.iterator(desc.file())) {
+            for (IgniteBiTuple<WALPointer, WALRecord> tuple : it) {
+                pointers.add((FileWALPointer) tuple.get1());
+            }
+        }
+
+        // Should have a previous record to return and another value before 
that to ensure that "lastReadPtr"
+        // in "doTest" will always exist.
+        int idxCorrupted = 2 + random.nextInt(pointers.size() - 2);
+
+        FileWALPointer pointer = pointers.get(idxCorrupted);
+        int crc32Off = pointer.fileOffset() + pointer.length() - CRC_SIZE;
+
+        ByteBuffer zeroCrc32 = allocate(CRC_SIZE); // Has 0 value by default.
+
+        FileIOFactory ioFactory = new RandomAccessFileIOFactory();
+        try (FileIO io = ioFactory.create(desc.file(), WRITE)) {
+            io.write(zeroCrc32, crc32Off);
+
+            io.force(true);
+        }
+
+        return pointers.get(idxCorrupted - 1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e153114d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteFsyncReplayWalIteratorInvalidCrcTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteFsyncReplayWalIteratorInvalidCrcTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteFsyncReplayWalIteratorInvalidCrcTest.java
new file mode 100644
index 0000000..4629acb
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteFsyncReplayWalIteratorInvalidCrcTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db.wal.crc;
+
+import org.apache.ignite.configuration.WALMode;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ *
+ */
+public class IgniteFsyncReplayWalIteratorInvalidCrcTest extends 
IgniteReplayWalIteratorInvalidCrcTest {
+    /** {@inheritDoc} */
+    @NotNull @Override protected WALMode getWalMode() {
+        return WALMode.FSYNC;
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e153114d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteReplayWalIteratorInvalidCrcTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteReplayWalIteratorInvalidCrcTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteReplayWalIteratorInvalidCrcTest.java
new file mode 100644
index 0000000..756ef78
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteReplayWalIteratorInvalidCrcTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db.wal.crc;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ *
+ */
+public class IgniteReplayWalIteratorInvalidCrcTest extends 
IgniteAbstractWalIteratorInvalidCrcTest {
+    /** {@inheritDoc} */
+    @NotNull @Override protected WALMode getWalMode() {
+        return WALMode.LOG_ONLY;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected WALIterator getWalIterator(
+        IgniteWriteAheadLogManager walMgr,
+        boolean ignoreArchiveDir
+    ) throws IgniteCheckedException {
+        if (ignoreArchiveDir)
+            throw new UnsupportedOperationException(
+                "Cannot invoke \"getWalIterator\" with true 
\"ignoreArchiveDir\" parameter value."
+            );
+        else
+            return walMgr.replay(null);
+    }
+
+    /**
+     * {@inheritDoc}
+     * Case is not relevant to the replay iterator.
+     */
+    @Override public void testNotTailCorruptedPtr() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e153114d/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteStandaloneWalIteratorInvalidCrcTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteStandaloneWalIteratorInvalidCrcTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteStandaloneWalIteratorInvalidCrcTest.java
new file mode 100644
index 0000000..8802184
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/crc/IgniteStandaloneWalIteratorInvalidCrcTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.db.wal.crc;
+
+import java.io.File;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.WALMode;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.pagemem.wal.WALIterator;
+import 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ *
+ */
+public class IgniteStandaloneWalIteratorInvalidCrcTest extends 
IgniteAbstractWalIteratorInvalidCrcTest {
+    /** {@inheritDoc} */
+    @NotNull @Override protected WALMode getWalMode() {
+        return WALMode.LOG_ONLY;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected WALIterator getWalIterator(
+        IgniteWriteAheadLogManager walMgr,
+        boolean ignoreArchiveDir
+    ) throws IgniteCheckedException {
+        File walArchiveDir = U.field(walMgr, "walArchiveDir");
+        File walDir = U.field(walMgr, "walWorkDir");
+
+        IgniteWalIteratorFactory iterFactory = new IgniteWalIteratorFactory();
+
+        return ignoreArchiveDir ? iterFactory.iterator(walDir) : 
iterFactory.iterator(walArchiveDir, walDir);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/e153114d/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index 956d256..a9f2601 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -57,6 +57,9 @@ import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompact
 import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.WalDeletionArchiveFsyncTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.WalDeletionArchiveLogOnlyTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.crc.IgniteDataIntegrityTests;
+import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.crc.IgniteFsyncReplayWalIteratorInvalidCrcTest;
+import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.crc.IgniteReplayWalIteratorInvalidCrcTest;
+import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.crc.IgniteStandaloneWalIteratorInvalidCrcTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.reader.IgniteWalReaderTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.wal.reader.StandaloneWalRecordsIteratorTest;
 
@@ -72,6 +75,9 @@ public class IgnitePdsTestSuite2 extends TestSuite {
 
         // Integrity test.
         suite.addTestSuite(IgniteDataIntegrityTests.class);
+        suite.addTestSuite(IgniteStandaloneWalIteratorInvalidCrcTest.class);
+        suite.addTestSuite(IgniteReplayWalIteratorInvalidCrcTest.class);
+        suite.addTestSuite(IgniteFsyncReplayWalIteratorInvalidCrcTest.class);
 
         addRealPageStoreTests(suite);
 

Reply via email to