http://git-wip-us.apache.org/repos/asf/ignite/blob/d6ab2ae6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorExceptionDuringReadTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorExceptionDuringReadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorExceptionDuringReadTest.java new file mode 100644 index 0000000..ccd889a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/IgniteWalIteratorExceptionDuringReadTest.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.db.wal; + +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.pagemem.wal.WALIterator; +import org.apache.ignite.internal.pagemem.wal.WALPointer; +import org.apache.ignite.internal.pagemem.wal.record.WALRecord; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer; +import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory; +import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory.IteratorParametersBuilder; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Assert; + +/** + * + */ +public class IgniteWalIteratorExceptionDuringReadTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int WAL_SEGMENT_SIZE = 1024 * 1024 * 20; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER)); + + cfg.setDataStorageConfiguration( + new DataStorageConfiguration() + .setWalSegmentSize(WAL_SEGMENT_SIZE) + .setWalMode(WALMode.LOG_ONLY) + .setDefaultDataRegionConfiguration( + new DataRegionConfiguration() + .setPersistenceEnabled(true) + ) + ); + + cfg.setCacheConfiguration(new CacheConfiguration(DEFAULT_CACHE_NAME)); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + cleanPersistenceDir(); + } + + /** + * @throws Exception If failed. + */ + public void test() throws Exception { + IgniteEx ig = (IgniteEx)startGrid(); + + ig.cluster().active(true); + + IgniteCache<Integer, byte[]> cache = ig.cache(DEFAULT_CACHE_NAME); + + for (int i = 0; i < 20 * 4; i++) + cache.put(i, new byte[1024 * 1024]); + + ig.cluster().active(false); + + IgniteWalIteratorFactory iteratorFactory = new IgniteWalIteratorFactory(log); + + FileWALPointer failOnPtr = new FileWALPointer(3, 1024 * 1024 * 5, 0); + + String failMessage = "test fail message"; + + IteratorParametersBuilder builder = new IteratorParametersBuilder() + .filesOrDirs(U.defaultWorkDirectory()) + .filter((r, ptr) -> { + FileWALPointer ptr0 = (FileWALPointer)ptr; + + if (ptr0.compareTo(failOnPtr) >= 0) + throw new TestRuntimeException(failMessage); + + return true; + }); + + try (WALIterator it = iteratorFactory.iterator(builder)) { + FileWALPointer ptr = null; + + boolean failed = false; + + while (it.hasNext()) { + try { + IgniteBiTuple<WALPointer, WALRecord> tup = it.next(); + + ptr = (FileWALPointer)tup.get1(); + } + catch (IgniteException e) { + Assert.assertNotNull(ptr); + Assert.assertEquals(failOnPtr.index(), ptr.index()); + Assert.assertTrue(ptr.compareTo(failOnPtr) < 0); + + failed = X.hasCause(e, TestRuntimeException.class); + + break; + } + } + + assertTrue(failed); + } + } + + /** + * + */ + private static class TestRuntimeException extends IgniteException { + /** + * @param msg Exception message. + */ + private TestRuntimeException(String msg) { + super(msg); + } + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/d6ab2ae6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java index 72ee0c8..7c54d62 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/reader/IgniteWalReaderTest.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.io.Serializable; -import java.util.Arrays; import java.util.Collection; import java.util.EnumMap; import java.util.HashMap; @@ -48,11 +47,10 @@ import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheRebalanceMode; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.WALMode; -import org.apache.ignite.events.Event; import org.apache.ignite.events.EventType; import org.apache.ignite.events.WalSegmentArchivedEvent; import org.apache.ignite.internal.pagemem.wal.WALIterator; @@ -66,24 +64,27 @@ import org.apache.ignite.internal.pagemem.wal.record.WALRecord; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory; +import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory.IteratorParametersBuilder; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiInClosure; import org.apache.ignite.lang.IgniteBiTuple; import org.apache.ignite.lang.IgniteInClosure; -import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.logger.NullLogger; -import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.transactions.Transaction; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.junit.Assert; +import static java.util.Arrays.fill; import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_ARCHIVED; +import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.DATA_RECORD; +import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.TX_RECORD; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.CREATE; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE; import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR; @@ -93,6 +94,9 @@ import static org.apache.ignite.internal.processors.cache.persistence.filename.P * Test suite for WAL segments reader and event generator. */ public class IgniteWalReaderTest extends GridCommonAbstractTest { + /** */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + /** Wal segments count */ private static final int WAL_SEGMENTS = 10; @@ -103,10 +107,7 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest { private static final String CACHE_ADDL_NAME = "cache1"; /** Dump records to logger. Should be false for non local run. */ - private static final boolean dumpRecords = false; - - /** Page size to set. */ - public static final int PAGE_SIZE = 4 * 1024; + private static final boolean DUMP_RECORDS = true; /** * Field for transferring setting from test to getConfig method. @@ -125,9 +126,11 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest { /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - final IgniteConfiguration cfg = super.getConfiguration(gridName); + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER)); - final CacheConfiguration<Integer, IndexedObject> ccfg = new CacheConfiguration<>(CACHE_NAME); + CacheConfiguration<Integer, IndexedObject> ccfg = new CacheConfiguration<>(CACHE_NAME); ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); ccfg.setRebalanceMode(CacheRebalanceMode.SYNC); @@ -140,8 +143,9 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest { DataStorageConfiguration dsCfg = new DataStorageConfiguration() .setDefaultDataRegionConfiguration( - new DataRegionConfiguration().setMaxSize(1024L * 1024 * 1024).setPersistenceEnabled(true)) - .setPageSize(PAGE_SIZE) + new DataRegionConfiguration() + .setMaxSize(1024L * 1024 * 1024) + .setPersistenceEnabled(true)) .setWalHistorySize(1) .setWalSegmentSize(1024 * 1024) .setWalSegments(WAL_SEGMENTS) @@ -150,12 +154,12 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest { if (archiveIncompleteSegmentAfterInactivityMs > 0) dsCfg.setWalAutoArchiveAfterInactivity(archiveIncompleteSegmentAfterInactivityMs); - final String workDir = U.defaultWorkDirectory(); - final File db = U.resolveWorkDirectory(workDir, DFLT_STORE_DIR, false); - final File wal = new File(db, "wal"); + String workDir = U.defaultWorkDirectory(); + File db = U.resolveWorkDirectory(workDir, DFLT_STORE_DIR, false); + File wal = new File(db, "wal"); if(setWalAndArchiveToSameValue) { - final String walAbsPath = wal.getAbsolutePath(); + String walAbsPath = wal.getAbsolutePath(); dsCfg.setWalPath(walAbsPath); dsCfg.setWalArchivePath(walAbsPath); @@ -173,85 +177,69 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest { @Override protected void beforeTest() throws Exception { stopAllGrids(); - deleteWorkFiles(); + cleanPersistenceDir(); } /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { stopAllGrids(); + cleanPersistenceDir(); + if (clearProperties) System.clearProperty(IgniteSystemProperties.IGNITE_WAL_LOG_TX_RECORDS); } /** - * @throws IgniteCheckedException If failed. - */ - private void deleteWorkFiles() throws Exception { - cleanPersistenceDir(); - } - - /** * @throws Exception if failed. */ public void testFillWalAndReadRecords() throws Exception { setWalAndArchiveToSameValue = false; - final int cacheObjectsToWrite = 10000; - final Ignite ignite0 = startGrid("node0"); + Ignite ignite0 = startGrid(); - ignite0.active(true); + ignite0.cluster().active(true); - final Serializable consistentId = (Serializable)ignite0.cluster().localNode().consistentId(); - final String subfolderName = genNewStyleSubfolderName(0, (UUID)consistentId); + Serializable consistentId = (Serializable)ignite0.cluster().localNode().consistentId(); - putDummyRecords(ignite0, cacheObjectsToWrite); + String subfolderName = genNewStyleSubfolderName(0, (UUID)consistentId); - stopGrid("node0"); + int cacheObjectsToWrite = 10_000; - final String workDir = U.defaultWorkDirectory(); - final File db = U.resolveWorkDirectory(workDir, DFLT_STORE_DIR, false); - final File wal = new File(db, "wal"); - final File walArchive = setWalAndArchiveToSameValue ? wal : new File(wal, "archive"); + putDummyRecords(ignite0, cacheObjectsToWrite); - int[] checkKeyIterArr = new int[cacheObjectsToWrite]; + stopGrid(); - final File walArchiveDirWithConsistentId = new File(walArchive, subfolderName); - final File walWorkDirWithConsistentId = new File(wal, subfolderName); - final IgniteWalIteratorFactory factory = createWalIteratorFactory(workDir, subfolderName); + String workDir = U.defaultWorkDirectory(); - //Check iteratorArchiveDirectory and iteratorArchiveFiles are same. - final int cntArchiveDir = iterateAndCount(factory.iteratorArchiveDirectory(walArchiveDirWithConsistentId)); + File db = U.resolveWorkDirectory(workDir, DFLT_STORE_DIR, false); - log.info("Total records loaded using directory : " + cntArchiveDir); + IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory(log); - final int cntArchiveFileByFile = iterateAndCount(factory.iteratorArchiveFiles( - walArchiveDirWithConsistentId.listFiles(FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER))); + IteratorParametersBuilder params = + createIteratorParametersBuilder(workDir, subfolderName) + .filesOrDirs(db); - log.info("Total records loaded using archive directory (file-by-file): " + cntArchiveFileByFile); + // Check iteratorArchiveDirectory and iteratorArchiveFiles are same. + int cntArchiveDir = iterateAndCount(factory.iterator(params)); - assertTrue(cntArchiveDir == cntArchiveFileByFile); + log.info("Total records loaded using directory : " + cntArchiveDir); - //Check iteratorArchiveFiles + iteratorWorkFiles iterate over all entries. - Arrays.fill(checkKeyIterArr, 0); + assertTrue(cntArchiveDir > 0); - iterateAndCountDataRecord(factory.iteratorArchiveFiles( - walArchiveDirWithConsistentId.listFiles(FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER)), new IgniteBiInClosure<Object, Object>() { - @Override public void apply(Object o, Object o2) { - checkKeyIterArr[(Integer)o]++; - } - }, null); + // Check iteratorArchiveFiles + iteratorWorkFiles iterate over all entries. + int[] checkKeyIterArr = new int[cacheObjectsToWrite]; - final File[] workFiles = walWorkDirWithConsistentId.listFiles(FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER); + fill(checkKeyIterArr, 0); - iterateAndCountDataRecord(factory.iteratorWorkFiles(workFiles), new IgniteBiInClosure<Object, Object>() { - @Override public void apply(Object o, Object o2) { - checkKeyIterArr[(Integer) o]++; - } - }, null).size(); + iterateAndCountDataRecord( + factory.iterator(params), + (o1, o2) -> checkKeyIterArr[(Integer)o1]++, + null + ); - for (int i =0 ; i< cacheObjectsToWrite; i++) - assertTrue("Iterator didn't find key="+ i, checkKeyIterArr[i] > 0); + for (int i = 0; i < cacheObjectsToWrite; i++) + assertTrue("Iterator didn't find key=" + i, checkKeyIterArr[i] > 0); } /** @@ -262,35 +250,29 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest { * @throws IgniteCheckedException if failed to iterate. */ private int iterateAndCount(WALIterator walIter) throws IgniteCheckedException { - return iterateAndCount(walIter, true); - } - - /** - * Iterates on records and closes iterator. - * - * @param walIter iterator to count, will be closed. - * @param touchEntries access data within entries. - * @return count of records. - * @throws IgniteCheckedException if failed to iterate. - */ - private int iterateAndCount(WALIterator walIter, boolean touchEntries) throws IgniteCheckedException { int cnt = 0; try (WALIterator it = walIter) { while (it.hasNextX()) { - final IgniteBiTuple<WALPointer, WALRecord> next = it.nextX(); - final WALRecord walRecord = next.get2(); - if (touchEntries && walRecord.type() == WALRecord.RecordType.DATA_RECORD) { - final DataRecord record = (DataRecord)walRecord; + IgniteBiTuple<WALPointer, WALRecord> tup = it.nextX(); + + WALRecord walRecord = tup.get2(); + + if (walRecord.type() == DATA_RECORD) { + DataRecord record = (DataRecord)walRecord; + for (DataEntry entry : record.writeEntries()) { - final KeyCacheObject key = entry.key(); - final CacheObject val = entry.value(); - if (dumpRecords) + KeyCacheObject key = entry.key(); + CacheObject val = entry.value(); + + if (DUMP_RECORDS) log.info("Op: " + entry.op() + ", Key: " + key + ", Value: " + val); } } - if (dumpRecords) + + if (DUMP_RECORDS) log.info("Record: " + walRecord); + cnt++; } } @@ -303,128 +285,148 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest { * @throws Exception if failed. */ public void testArchiveCompletedEventFired() throws Exception { - final AtomicBoolean evtRecorded = new AtomicBoolean(); + AtomicBoolean evtRecorded = new AtomicBoolean(); - final Ignite ignite = startGrid("node0"); + Ignite ignite = startGrid(); - ignite.active(true); + ignite.cluster().active(true); final IgniteEvents evts = ignite.events(); if (!evts.isEnabled(EVT_WAL_SEGMENT_ARCHIVED)) - assertTrue("nothing to test", false); + fail("nothing to test"); - evts.localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event e) { - WalSegmentArchivedEvent archComplEvt = (WalSegmentArchivedEvent)e; - long idx = archComplEvt.getAbsWalSegmentIdx(); - log.info("Finished archive for segment [" + idx + ", " + - archComplEvt.getArchiveFile() + "]: [" + e + "]"); + evts.localListen(e -> { + WalSegmentArchivedEvent archComplEvt = (WalSegmentArchivedEvent)e; - evtRecorded.set(true); - return true; - } + long idx = archComplEvt.getAbsWalSegmentIdx(); + + log.info("Finished archive for segment [" + + idx + ", " + archComplEvt.getArchiveFile() + "]: [" + e + "]"); + + evtRecorded.set(true); + + return true; }, EVT_WAL_SEGMENT_ARCHIVED); putDummyRecords(ignite, 500); - stopGrid("node0"); + stopGrid(); + assertTrue(evtRecorded.get()); } /** - * Puts provided number of records to fill WAL. + * Tests time out based WAL segment archiving. * - * @param ignite ignite instance. - * @param recordsToWrite count. + * @throws Exception if failure occurs. */ - private void putDummyRecords(Ignite ignite, int recordsToWrite) { - IgniteCache<Object, Object> cache0 = ignite.cache(CACHE_NAME); + public void testArchiveIncompleteSegmentAfterInactivity() throws Exception { + AtomicBoolean waitingForEvt = new AtomicBoolean(); - for (int i = 0; i < recordsToWrite; i++) - cache0.put(i, new IndexedObject(i)); - } + CountDownLatch archiveSegmentForInactivity = new CountDownLatch(1); - /** - * Puts provided number of records to fill WAL. - * - * @param ignite ignite instance. - * @param recordsToWrite count. - */ - private void putAllDummyRecords(Ignite ignite, int recordsToWrite) { - IgniteCache<Object, Object> cache0 = ignite.cache(CACHE_NAME); + archiveIncompleteSegmentAfterInactivityMs = 1000; - Map<Object, Object> values = new HashMap<>(); + Ignite ignite = startGrid(); - for (int i = 0; i < recordsToWrite; i++) - values.put(i, new IndexedObject(i)); + ignite.cluster().active(true); - cache0.putAll(values); - } + IgniteEvents evts = ignite.events(); - /** - * Puts provided number of records to fill WAL under transactions. - * - * @param ignite ignite instance. - * @param recordsToWrite count. - * @param txCnt transactions to run. If number is less then records count, txCnt records will be written. - */ - private IgniteCache<Object, Object> txPutDummyRecords(Ignite ignite, int recordsToWrite, int txCnt) { - IgniteCache<Object, Object> cache0 = ignite.cache(CACHE_NAME); - int keysPerTx = recordsToWrite / txCnt; - if (keysPerTx == 0) - keysPerTx = 1; - for (int t = 0; t < txCnt; t++) { - try (Transaction tx = ignite.transactions().txStart()) { - for (int i = t * keysPerTx; i < (t + 1) * keysPerTx; i++) - cache0.put(i, new IndexedObject(i)); + evts.localListen(e -> { + WalSegmentArchivedEvent archComplEvt = (WalSegmentArchivedEvent)e; - tx.commit(); - } - } - return cache0; + long idx = archComplEvt.getAbsWalSegmentIdx(); + + log.info("Finished archive for segment [" + idx + ", " + + archComplEvt.getArchiveFile() + "]: [" + e + "]"); + + if (waitingForEvt.get()) + archiveSegmentForInactivity.countDown(); + + return true; + }, EVT_WAL_SEGMENT_ARCHIVED); + + putDummyRecords(ignite, 100); + + waitingForEvt.set(true); // Flag for skipping regular log() and rollOver(). + + log.info("Wait for archiving segment for inactive grid started"); + + boolean recordedAfterSleep = archiveSegmentForInactivity.await( + archiveIncompleteSegmentAfterInactivityMs + 1001, TimeUnit.MILLISECONDS); + + stopGrid(); + + assertTrue(recordedAfterSleep); } /** - * Tests time out based WAL segment archiving. + * Tests archive completed event is fired. * - * @throws Exception if failure occurs. + * @throws Exception if failed. */ - public void testArchiveIncompleteSegmentAfterInactivity() throws Exception { - final AtomicBoolean waitingForEvt = new AtomicBoolean(); - final CountDownLatch archiveSegmentForInactivity = new CountDownLatch(1); + public void testFillWalForExactSegmentsCount() throws Exception { + customWalMode = WALMode.FSYNC; - archiveIncompleteSegmentAfterInactivityMs = 1000; + CountDownLatch reqSegments = new CountDownLatch(15); - final Ignite ignite = startGrid("node0"); + Ignite ignite = startGrid(); - ignite.active(true); + ignite.cluster().active(true); final IgniteEvents evts = ignite.events(); - evts.localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event e) { - WalSegmentArchivedEvent archComplEvt = (WalSegmentArchivedEvent)e; - long idx = archComplEvt.getAbsWalSegmentIdx(); - log.info("Finished archive for segment [" + idx + ", " + - archComplEvt.getArchiveFile() + "]: [" + e + "]"); + if (!evts.isEnabled(EVT_WAL_SEGMENT_ARCHIVED)) + fail("nothing to test"); - if (waitingForEvt.get()) - archiveSegmentForInactivity.countDown(); - return true; - } + evts.localListen(e -> { + WalSegmentArchivedEvent archComplEvt = (WalSegmentArchivedEvent)e; + + long idx = archComplEvt.getAbsWalSegmentIdx(); + + log.info("Finished archive for segment [" + idx + ", " + + archComplEvt.getArchiveFile() + "]: [" + e + "]"); + + reqSegments.countDown(); + + return true; }, EVT_WAL_SEGMENT_ARCHIVED); - putDummyRecords(ignite, 100); - waitingForEvt.set(true); //flag for skipping regular log() and rollOver() + int totalEntries = 0; - log.info("Wait for archiving segment for inactive grid started"); + while (reqSegments.getCount() > 0) { + int write = 500; - boolean recordedAfterSleep = - archiveSegmentForInactivity.await(archiveIncompleteSegmentAfterInactivityMs + 1001, TimeUnit.MILLISECONDS); + putAllDummyRecords(ignite, write); - stopGrid("node0"); - assertTrue(recordedAfterSleep); + totalEntries += write; + + Assert.assertTrue("Too much entries generated, but segments was not become available", + totalEntries < 10000); + } + + String subfolderName = genDbSubfolderName(ignite, 0); + + stopGrid(); + + String workDir = U.defaultWorkDirectory(); + + IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory(log); + + IteratorParametersBuilder iteratorParametersBuilder = createIteratorParametersBuilder(workDir, subfolderName); + + iteratorParametersBuilder.filesOrDirs(workDir); + + scanIterateAndCount( + factory, + iteratorParametersBuilder, + totalEntries, + 0, + null, + null + ); } /** @@ -450,51 +452,50 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest { * @throws Exception if failed. */ public void testTxFillWalAndExtractDataRecords() throws Exception { - final int cntEntries = 1000; - final int txCnt = 100; + Ignite ignite0 = startGrid(); - final Ignite ignite0 = startGrid("node0"); + ignite0.cluster().active(true); - ignite0.active(true); + int cntEntries = 1000; + int txCnt = 100; - final IgniteCache<Object, Object> entries = txPutDummyRecords(ignite0, cntEntries, txCnt); + IgniteCache<Object, Object> entries = txPutDummyRecords(ignite0, cntEntries, txCnt); - final Map<Object, Object> ctrlMap = new HashMap<>(); for (Cache.Entry<Object, Object> next : entries) - ctrlMap.put(next.getKey(), next.getValue()); + Map<Object, Object> ctrlMap = new HashMap<>(); + for (Cache.Entry<Object, Object> next : entries) + ctrlMap.put(next.getKey(), next.getValue()); - final String subfolderName = genDbSubfolderName(ignite0, 0); - stopGrid("node0"); + String subfolderName = genDbSubfolderName(ignite0, 0); - final String workDir = U.defaultWorkDirectory(); - final File binaryMeta = U.resolveWorkDirectory(workDir, "binary_meta", false); - final File binaryMetaWithConsId = new File(binaryMeta, subfolderName); - final File marshallerMapping = U.resolveWorkDirectory(workDir, "marshaller", false); + stopGrid(); - final IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory(log, - PAGE_SIZE, - binaryMetaWithConsId, - marshallerMapping); + String workDir = U.defaultWorkDirectory(); - final IgniteBiInClosure<Object, Object> objConsumer = new IgniteBiInClosure<Object, Object>() { - @Override public void apply(Object key, Object val) { - boolean rmv = remove(ctrlMap, key, val); - if (!rmv) - log.error("Unable to remove Key and value from control Map K:[" + key + "] V: [" + val + "]"); + IteratorParametersBuilder params = createIteratorParametersBuilder(workDir,subfolderName); - if (val instanceof IndexedObject) { - IndexedObject indexedObj = (IndexedObject)val; + params.filesOrDirs(workDir); - assertEquals(indexedObj.iVal, indexedObj.jVal); - assertEquals(indexedObj.iVal, key); + IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory(log); - for (byte datum : indexedObj.getData()) - assertTrue(datum >= 'A' && datum <= 'A' + 10); - } + IgniteBiInClosure<Object, Object> objConsumer = (key, val) -> { + boolean rmv = remove(ctrlMap, key, val); + + if (!rmv) + log.error("Unable to remove Key and value from control Map K:[" + key + "] V: [" + val + "]"); + + if (val instanceof IndexedObject) { + IndexedObject indexedObj = (IndexedObject)val; + + assertEquals(indexedObj.iVal, indexedObj.jVal); + assertEquals(indexedObj.iVal, key); + + for (byte datum : indexedObj.getData()) + assertTrue(datum >= 'A' && datum <= 'A' + 10); } }; - scanIterateAndCount(factory, workDir, subfolderName, cntEntries, txCnt, objConsumer, null); + scanIterateAndCount(factory, params, cntEntries, txCnt, objConsumer, null); assertTrue(" Control Map is not empty after reading entries " + ctrlMap, ctrlMap.isEmpty()); } @@ -514,8 +515,6 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest { * Scan WAL and WAL archive for logical records and its entries. * * @param factory WAL iterator factory. - * @param workDir Ignite work directory. - * @param subfolderName DB subfolder name based on consistent ID. * @param minCntEntries minimum expected entries count to find. * @param minTxCnt minimum expected transaction count to find. * @param objConsumer object handler, called for each object found in logical data records. @@ -523,67 +522,43 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest { * @throws IgniteCheckedException if failed. */ private void scanIterateAndCount( - final IgniteWalIteratorFactory factory, - final String workDir, - final String subfolderName, - final int minCntEntries, - final int minTxCnt, - @Nullable final IgniteBiInClosure<Object, Object> objConsumer, - @Nullable final IgniteInClosure<DataRecord> dataRecordHnd) throws IgniteCheckedException { - - final File db = U.resolveWorkDirectory(workDir, DFLT_STORE_DIR, false); - final File wal = new File(db, "wal"); - final File walArchive = new File(wal, "archive"); - - final File walArchiveDirWithConsistentId = new File(walArchive, subfolderName); - - final File[] files = walArchiveDirWithConsistentId.listFiles(FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER); - A.notNull(files, "Can't iterate over files [" + walArchiveDirWithConsistentId + "] Directory is N/A"); - final WALIterator iter = factory.iteratorArchiveFiles(files); + IgniteWalIteratorFactory factory, + IteratorParametersBuilder itParamBuilder, + int minCntEntries, + int minTxCnt, + @Nullable IgniteBiInClosure<Object, Object> objConsumer, + @Nullable IgniteInClosure<DataRecord> dataRecordHnd + ) throws IgniteCheckedException { + WALIterator iter = factory.iterator(itParamBuilder); - final Map<GridCacheVersion, Integer> cntArch = iterateAndCountDataRecord(iter, objConsumer, dataRecordHnd); + Map<GridCacheVersion, Integer> cntArch = iterateAndCountDataRecord(iter, objConsumer, dataRecordHnd); int txCntObservedArch = cntArch.size(); - if (cntArch.containsKey(null)) - txCntObservedArch -= 1; // exclude non transactional updates - final int entriesArch = valuesSum(cntArch.values()); - log.info("Total tx found loaded using archive directory (file-by-file): " + txCntObservedArch); + if (cntArch.containsKey(null)) + txCntObservedArch -= 1; // Exclude non transactional updates. - final File walWorkDirWithNodeSubDir = new File(wal, subfolderName); - final File[] workFiles = walWorkDirWithNodeSubDir.listFiles(FileWriteAheadLogManager.WAL_SEGMENT_FILE_FILTER); + int entries = valuesSum(cntArch.values()); - final WALIterator tuples = factory.iteratorWorkFiles(workFiles); - final Map<GridCacheVersion, Integer> cntWork = iterateAndCountDataRecord(tuples, objConsumer, dataRecordHnd); - int txCntObservedWork = cntWork.size(); - if (cntWork.containsKey(null)) - txCntObservedWork -= 1; // exclude non transactional updates + log.info("Total tx found loaded using archive directory (file-by-file): " + txCntObservedArch); - final int entriesWork = valuesSum(cntWork.values()); - log.info("Archive directory: Tx found " + txCntObservedWork + " entries " + entriesWork); + assertTrue("txCntObservedArch=" + txCntObservedArch + " >= minTxCnt=" + minTxCnt, + txCntObservedArch >= minTxCnt); - assertTrue("entriesArch=" + entriesArch + " + entriesWork=" + entriesWork - + " >= minCntEntries=" + minCntEntries, - entriesArch + entriesWork >= minCntEntries); - assertTrue("txCntObservedWork=" + txCntObservedWork + " + txCntObservedArch=" + txCntObservedArch - + " >= minTxCnt=" + minTxCnt, - txCntObservedWork + txCntObservedArch >= minTxCnt); + assertTrue("entries=" + entries + " >= minCntEntries=" + minCntEntries, + entries >= minCntEntries); } /** * @throws Exception if failed. */ public void testFillWalWithDifferentTypes() throws Exception { - int cntEntries; + Ignite ig = startGrid(); - final Map<Object, Object> ctrlMap = new HashMap<>(); - final Map<Object, Object> ctrlMapForBinaryObjects = new HashMap<>(); - final Collection<String> ctrlStringsToSearch = new HashSet<>(); - final Collection<String> ctrlStringsForBinaryObjSearch = new HashSet<>(); - final Ignite ignite0 = startGrid("node0"); - ignite0.active(true); + ig.cluster().active(true); + + IgniteCache<Object, Object> addlCache = ig.getOrCreateCache(CACHE_ADDL_NAME); - final IgniteCache<Object, Object> addlCache = ignite0.getOrCreateCache(CACHE_ADDL_NAME); addlCache.put("1", "2"); addlCache.put(1, 2); addlCache.put(1L, 2L); @@ -597,200 +572,173 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest { addlCache.put(new TestExternalizable(42), "Externalizable_As_Key"); addlCache.put(292, new IndexedObject(292)); - final String search1 = "SomeUnexpectedStringValueAsKeyToSearch"; + String search1 = "SomeUnexpectedStringValueAsKeyToSearch"; + + Collection<String> ctrlStringsToSearch = new HashSet<>(); + ctrlStringsToSearch.add(search1); + + Collection<String> ctrlStringsForBinaryObjSearch = new HashSet<>(); + ctrlStringsForBinaryObjSearch.add(search1); + addlCache.put(search1, "SearchKey"); String search2 = "SomeTestStringContainerToBePrintedLongLine"; - final TestStringContainerToBePrinted val = new TestStringContainerToBePrinted(search2); + + TestStringContainerToBePrinted val = new TestStringContainerToBePrinted(search2); + ctrlStringsToSearch.add(val.toString()); //will validate original toString() was called ctrlStringsForBinaryObjSearch.add(search2); + addlCache.put("SearchValue", val); String search3 = "SomeTestStringContainerToBePrintedLongLine2"; - final TestStringContainerToBePrinted key = new TestStringContainerToBePrinted(search3); + + TestStringContainerToBePrinted key = new TestStringContainerToBePrinted(search3); ctrlStringsToSearch.add(key.toString()); //will validate original toString() was called ctrlStringsForBinaryObjSearch.add(search3); //validate only string itself + addlCache.put(key, "SearchKey"); - cntEntries = addlCache.size(); - for (Cache.Entry<Object, Object> next : addlCache) - ctrlMap.put(next.getKey(), next.getValue()); + int cntEntries = addlCache.size(); - for (Cache.Entry<Object, Object> next : addlCache) - ctrlMapForBinaryObjects.put(next.getKey(), next.getValue()); + Map<Object, Object> ctrlMap = new HashMap<>(); + for (Cache.Entry<Object, Object> next : addlCache) + ctrlMap.put(next.getKey(), next.getValue()); - final String subfolderName = genDbSubfolderName(ignite0, 0); + Map<Object, Object> ctrlMapForBinaryObjects = new HashMap<>(); - stopGrid("node0"); + for (Cache.Entry<Object, Object> next : addlCache) + ctrlMapForBinaryObjects.put(next.getKey(), next.getValue()); - final String workDir = U.defaultWorkDirectory(); + String subfolderName = genDbSubfolderName(ig, 0); - final File binaryMeta = U.resolveWorkDirectory(workDir, "binary_meta", false); - final File binaryMetaWithNodeSubfolder = new File(binaryMeta, subfolderName); - final File marshallerMapping = U.resolveWorkDirectory(workDir, "marshaller", false); + // Wait async allocation wal segment file by archiver. + Thread.sleep(1000); - final IgniteWalIteratorFactory factory = createWalIteratorFactory(workDir, subfolderName); - final IgniteBiInClosure<Object, Object> objConsumer = new IgniteBiInClosure<Object, Object>() { - @Override public void apply(Object key, Object val) { - log.info("K: [" + key + ", " + - (key != null ? key.getClass().getName() : "?") + "]" + - " V: [" + val + ", " + - (val != null ? val.getClass().getName() : "?") + "]"); - boolean rmv = remove(ctrlMap, key, val); - if (!rmv) { - String msg = "Unable to remove pair from control map " + "K: [" + key + "] V: [" + val + "]"; - log.error(msg); - } - assertFalse(val instanceof BinaryObject); + stopGrid("node0", false); + + String workDir = U.defaultWorkDirectory(); + + IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory(log); + + IteratorParametersBuilder params0 = createIteratorParametersBuilder(workDir, subfolderName); + + params0.filesOrDirs(workDir); + + IgniteBiInClosure<Object, Object> objConsumer = (key12, val1) -> { + log.info("K: [" + key12 + ", " + + (key12 != null ? key12.getClass().getName() : "?") + "]" + + " V: [" + val1 + ", " + + (val1 != null ? val1.getClass().getName() : "?") + "]"); + boolean rmv = remove(ctrlMap, key12, val1); + if (!rmv) { + String msg = "Unable to remove pair from control map " + "K: [" + key12 + "] V: [" + val1 + "]"; + log.error(msg); } + assertFalse(val1 instanceof BinaryObject); }; - final IgniteInClosure<DataRecord> toStrChecker = new IgniteInClosure<DataRecord>() { - @Override public void apply(DataRecord record) { - String strRepresentation = record.toString(); - for (Iterator<String> iter = ctrlStringsToSearch.iterator(); iter.hasNext(); ) { - final String next = iter.next(); - if (strRepresentation.contains(next)) { - iter.remove(); - break; - } + IgniteInClosure<DataRecord> toStrChecker = record -> { + String strRepresentation = record.toString(); + + for (Iterator<String> iter = ctrlStringsToSearch.iterator(); iter.hasNext(); ) { + final String next = iter.next(); + if (strRepresentation.contains(next)) { + iter.remove(); + break; } } }; - scanIterateAndCount(factory, workDir, subfolderName, cntEntries, 0, objConsumer, toStrChecker); + + scanIterateAndCount(factory, params0, cntEntries, 0, objConsumer, toStrChecker); assertTrue(" Control Map is not empty after reading entries: " + ctrlMap, ctrlMap.isEmpty()); assertTrue(" Control Map for strings in entries is not empty after" + " reading records: " + ctrlStringsToSearch, ctrlStringsToSearch.isEmpty()); - //Validate same WAL log with flag binary objects only - final IgniteWalIteratorFactory keepBinFactory = new IgniteWalIteratorFactory(log, PAGE_SIZE, - binaryMetaWithNodeSubfolder, - marshallerMapping, - true); - final IgniteBiInClosure<Object, Object> binObjConsumer = new IgniteBiInClosure<Object, Object>() { - @Override public void apply(Object key, Object val) { - log.info("K(KeepBinary): [" + key + ", " + - (key != null ? key.getClass().getName() : "?") + "]" + - " V(KeepBinary): [" + val + ", " + - (val != null ? val.getClass().getName() : "?") + "]"); - boolean rmv = remove(ctrlMapForBinaryObjects, key, val); - if (!rmv) { - if (key instanceof BinaryObject) { - BinaryObject keyBinObj = (BinaryObject)key; - String binaryObjTypeName = keyBinObj.type().typeName(); - if (Objects.equals(TestStringContainerToBePrinted.class.getName(), binaryObjTypeName)) { - String data = keyBinObj.field("data"); - rmv = ctrlMapForBinaryObjects.remove(new TestStringContainerToBePrinted(data)) != null; - } - else if (Objects.equals(TestSerializable.class.getName(), binaryObjTypeName)) { - Integer iVal = keyBinObj.field("iVal"); - rmv = ctrlMapForBinaryObjects.remove(new TestSerializable(iVal)) != null; - } - else if (Objects.equals(TestEnum.class.getName(), binaryObjTypeName)) { - TestEnum key1 = TestEnum.values()[keyBinObj.enumOrdinal()]; - rmv = ctrlMapForBinaryObjects.remove(key1) != null; - } + IgniteBiInClosure<Object, Object> binObjConsumer = (key13, val12) -> { + log.info("K(KeepBinary): [" + key13 + ", " + + (key13 != null ? key13.getClass().getName() : "?") + "]" + + " V(KeepBinary): [" + val12 + ", " + + (val12 != null ? val12.getClass().getName() : "?") + "]"); + + boolean rmv = remove(ctrlMapForBinaryObjects, key13, val12); + + if (!rmv) { + if (key13 instanceof BinaryObject) { + BinaryObject keyBinObj = (BinaryObject)key13; + String binaryObjTypeName = keyBinObj.type().typeName(); + + if (Objects.equals(TestStringContainerToBePrinted.class.getName(), binaryObjTypeName)) { + String data = keyBinObj.field("data"); + rmv = ctrlMapForBinaryObjects.remove(new TestStringContainerToBePrinted(data)) != null; } - else if (val instanceof BinaryObject) { - //don't compare BO values, just remove by key - rmv = ctrlMapForBinaryObjects.remove(key) != null; + else if (Objects.equals(TestSerializable.class.getName(), binaryObjTypeName)) { + Integer iVal = keyBinObj.field("iVal"); + rmv = ctrlMapForBinaryObjects.remove(new TestSerializable(iVal)) != null; } - } - if (!rmv) - log.error("Unable to remove pair from control map " + "K: [" + key + "] V: [" + val + "]"); - - if (val instanceof BinaryObject) { - BinaryObject binaryObj = (BinaryObject)val; - String binaryObjTypeName = binaryObj.type().typeName(); - if (Objects.equals(IndexedObject.class.getName(), binaryObjTypeName)) { - assertEquals(binaryObj.field("iVal").toString(), - binaryObj.field("jVal").toString()); - - byte data[] = binaryObj.field("data"); - for (byte datum : data) - assertTrue(datum >= 'A' && datum <= 'A' + 10); + else if (Objects.equals(TestEnum.class.getName(), binaryObjTypeName)) { + TestEnum key1 = TestEnum.values()[keyBinObj.enumOrdinal()]; + rmv = ctrlMapForBinaryObjects.remove(key1) != null; } } + else if (val12 instanceof BinaryObject) { + //don't compare BO values, just remove by key + rmv = ctrlMapForBinaryObjects.remove(key13) != null; + } } - }; + if (!rmv) + log.error("Unable to remove pair from control map " + "K: [" + key13 + "] V: [" + val12 + "]"); - final IgniteInClosure<DataRecord> binObjToStrChecker = new IgniteInClosure<DataRecord>() { - @Override public void apply(DataRecord record) { - String strRepresentation = record.toString(); + if (val12 instanceof BinaryObject) { + BinaryObject binaryObj = (BinaryObject)val12; + String binaryObjTypeName = binaryObj.type().typeName(); - for (Iterator<String> iter = ctrlStringsForBinaryObjSearch.iterator(); iter.hasNext(); ) { - final String next = iter.next(); + if (Objects.equals(IndexedObject.class.getName(), binaryObjTypeName)) { + assertEquals( + binaryObj.field("iVal").toString(), + binaryObj.field("jVal").toString() + ); - if (strRepresentation.contains(next)) { - iter.remove(); + byte data[] = binaryObj.field("data"); - break; - } + for (byte datum : data) + assertTrue(datum >= 'A' && datum <= 'A' + 10); } } }; - scanIterateAndCount(keepBinFactory, workDir, subfolderName, cntEntries, 0, binObjConsumer, binObjToStrChecker); - - assertTrue(" Control Map is not empty after reading entries: " + ctrlMapForBinaryObjects, - ctrlMapForBinaryObjects.isEmpty()); - assertTrue(" Control Map for strings in entries is not empty after" + - " reading records: " + ctrlStringsForBinaryObjSearch, - ctrlStringsForBinaryObjSearch.isEmpty()); - - } - - /** - * Tests archive completed event is fired. - * - * @throws Exception if failed. - */ - public void testFillWalForExactSegmentsCount() throws Exception { - customWalMode = WALMode.FSYNC; - - final CountDownLatch reqSegments = new CountDownLatch(15); - final Ignite ignite = startGrid("node0"); - - ignite.active(true); + IgniteInClosure<DataRecord> binObjToStrChecker = record -> { + String strRepresentation = record.toString(); - final IgniteEvents evts = ignite.events(); - - if (!evts.isEnabled(EVT_WAL_SEGMENT_ARCHIVED)) - assertTrue("nothing to test", false); + for (Iterator<String> iter = ctrlStringsForBinaryObjSearch.iterator(); iter.hasNext(); ) { + final String next = iter.next(); - evts.localListen(new IgnitePredicate<Event>() { - @Override public boolean apply(Event e) { - WalSegmentArchivedEvent archComplEvt = (WalSegmentArchivedEvent)e; - long idx = archComplEvt.getAbsWalSegmentIdx(); - log.info("Finished archive for segment [" + idx + ", " + - archComplEvt.getArchiveFile() + "]: [" + e + "]"); + if (strRepresentation.contains(next)) { + iter.remove(); - reqSegments.countDown(); - return true; + break; + } } - }, EVT_WAL_SEGMENT_ARCHIVED); + }; + IteratorParametersBuilder params1 = createIteratorParametersBuilder(workDir, subfolderName); - int totalEntries = 0; - while (reqSegments.getCount() > 0) { - final int write = 500; - putAllDummyRecords(ignite, write); - totalEntries += write; - Assert.assertTrue("Too much entries generated, but segments was not become available", - totalEntries < 10000); - } - final String subfolderName = genDbSubfolderName(ignite, 0); + params1.filesOrDirs(workDir).keepBinary(true); - stopGrid("node0"); + //Validate same WAL log with flag binary objects only + IgniteWalIteratorFactory keepBinFactory = new IgniteWalIteratorFactory(log); - final String workDir = U.defaultWorkDirectory(); - final IgniteWalIteratorFactory factory = createWalIteratorFactory(workDir, subfolderName); + scanIterateAndCount(keepBinFactory, params1, cntEntries, 0, binObjConsumer, binObjToStrChecker); - scanIterateAndCount(factory, workDir, subfolderName, totalEntries, 0, null, null); + assertTrue(" Control Map is not empty after reading entries: " + + ctrlMapForBinaryObjects, ctrlMapForBinaryObjects.isEmpty()); + + assertTrue(" Control Map for strings in entries is not empty after" + + " reading records: " + ctrlStringsForBinaryObjSearch, ctrlStringsForBinaryObjSearch.isEmpty()); } /** @@ -801,19 +749,32 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest { public void testReadEmptyWal() throws Exception { customWalMode = WALMode.FSYNC; - final Ignite ignite = startGrid("node0"); + Ignite ignite = startGrid(); - ignite.active(true); - ignite.active(false); + ignite.cluster().active(true); + + ignite.cluster().active(false); final String subfolderName = genDbSubfolderName(ignite, 0); - stopGrid("node0"); + stopGrid(); + + String workDir = U.defaultWorkDirectory(); + + IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory(log); - final String workDir = U.defaultWorkDirectory(); - final IgniteWalIteratorFactory factory = createWalIteratorFactory(workDir, subfolderName); + IteratorParametersBuilder iteratorParametersBuilder = + createIteratorParametersBuilder(workDir, subfolderName) + .filesOrDirs(workDir); - scanIterateAndCount(factory, workDir, subfolderName, 0, 0, null, null); + scanIterateAndCount( + factory, + iteratorParametersBuilder, + 0, + 0, + null, + null + ); } /** @@ -873,54 +834,72 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest { * @param mode Cache Atomicity Mode. */ private void runRemoveOperationTest(CacheAtomicityMode mode) throws Exception { - final Ignite ignite = startGrid("node0"); + Ignite ignite = startGrid(); + + ignite.cluster().active(true); - ignite.active(true); createCache2(ignite, mode); - ignite.active(false); - final String subfolderName = genDbSubfolderName(ignite, 0); + ignite.cluster().active(false); + + String subfolderName = genDbSubfolderName(ignite, 0); + + stopGrid(); + + String workDir = U.defaultWorkDirectory(); + + IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory(log); - stopGrid("node0"); + IteratorParametersBuilder params = createIteratorParametersBuilder(workDir, subfolderName); - final String workDir = U.defaultWorkDirectory(); - final IgniteWalIteratorFactory factory = createWalIteratorFactory(workDir, subfolderName); + params.filesOrDirs(workDir); - final StringBuilder builder = new StringBuilder(); - final Map<GridCacheOperation, Integer> operationsFound = new EnumMap<>(GridCacheOperation.class); + StringBuilder sb = new StringBuilder(); - scanIterateAndCount(factory, workDir, subfolderName, 0, 0, null, new IgniteInClosure<DataRecord>() { - @Override public void apply(DataRecord dataRecord) { + Map<GridCacheOperation, Integer> operationsFound = new EnumMap<>(GridCacheOperation.class); + + scanIterateAndCount( + factory, + params, + 0, + 0, + null, + dataRecord -> { final List<DataEntry> entries = dataRecord.writeEntries(); - builder.append("{"); + sb.append("{"); + for (DataEntry entry : entries) { - final GridCacheOperation op = entry.op(); - final Integer cnt = operationsFound.get(op); + GridCacheOperation op = entry.op(); + Integer cnt = operationsFound.get(op); operationsFound.put(op, cnt == null ? 1 : (cnt + 1)); if (entry instanceof UnwrapDataEntry) { - final UnwrapDataEntry entry1 = (UnwrapDataEntry)entry; + UnwrapDataEntry entry1 = (UnwrapDataEntry)entry; - builder.append(entry1.op()).append(" for ").append(entry1.unwrappedKey()); - final GridCacheVersion ver = entry.nearXidVersion(); + sb.append(entry1.op()) + .append(" for ") + .append(entry1.unwrappedKey()); - builder.append(", "); + GridCacheVersion ver = entry.nearXidVersion(); + + sb.append(", "); if (ver != null) - builder.append("tx=").append(ver).append(", "); + sb.append("tx=") + .append(ver) + .append(", "); } } - builder.append("}\n"); - } - }); + sb.append("}\n"); + }); final Integer deletesFound = operationsFound.get(DELETE); if (log.isInfoEnabled()) - log.info(builder.toString()); + log.info(sb.toString()); assertTrue("Delete operations should be found in log: " + operationsFound, deletesFound != null && deletesFound > 0); @@ -928,109 +907,144 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest { /** * Tests transaction generation and WAL for putAll cache operation. + * * @throws Exception if failed. */ public void testPutAllTxIntoTwoNodes() throws Exception { - final Ignite ignite = startGrid("node0"); - final Ignite ignite1 = startGrid(1); + Ignite ignite = startGrid("node0"); + Ignite ignite1 = startGrid(1); + + ignite.cluster().active(true); - ignite.active(true); + Map<Object, IndexedObject> map = new TreeMap<>(); - final Map<Object, IndexedObject> map = new TreeMap<>(); + int cntEntries = 1000; - final int cntEntries = 1000; for (int i = 0; i < cntEntries; i++) map.put(i, new IndexedObject(i)); ignite.cache(CACHE_NAME).putAll(map); - ignite.active(false); + ignite.cluster().active(false); - final String subfolderName = genDbSubfolderName(ignite, 0); - final String subfolderName1 = genDbSubfolderName(ignite1, 1); + String subfolderName1 = genDbSubfolderName(ignite, 0); + String subfolderName2 = genDbSubfolderName(ignite1, 1); stopAllGrids(); - final String workDir = U.defaultWorkDirectory(); - final IgniteWalIteratorFactory factory = createWalIteratorFactory(workDir, subfolderName); + String workDir = U.defaultWorkDirectory(); - final StringBuilder builder = new StringBuilder(); - final Map<GridCacheOperation, Integer> operationsFound = new EnumMap<>(GridCacheOperation.class); + IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory(log); - final IgniteInClosure<DataRecord> drHnd = new IgniteInClosure<DataRecord>() { - @Override public void apply(DataRecord dataRecord) { - final List<DataEntry> entries = dataRecord.writeEntries(); + StringBuilder sb = new StringBuilder(); - builder.append("{"); - for (DataEntry entry : entries) { - final GridCacheOperation op = entry.op(); - final Integer cnt = operationsFound.get(op); + Map<GridCacheOperation, Integer> operationsFound = new EnumMap<>(GridCacheOperation.class); - operationsFound.put(op, cnt == null ? 1 : (cnt + 1)); + IgniteInClosure<DataRecord> drHnd = dataRecord -> { + List<DataEntry> entries = dataRecord.writeEntries(); - if (entry instanceof UnwrapDataEntry) { - final UnwrapDataEntry entry1 = (UnwrapDataEntry)entry; + sb.append("{"); - builder.append(entry1.op()).append(" for ").append(entry1.unwrappedKey()); - final GridCacheVersion ver = entry.nearXidVersion(); + for (DataEntry entry : entries) { + GridCacheOperation op = entry.op(); + Integer cnt = operationsFound.get(op); - builder.append(", "); + operationsFound.put(op, cnt == null ? 1 : (cnt + 1)); - if (ver != null) - builder.append("tx=").append(ver).append(", "); - } - } + if (entry instanceof UnwrapDataEntry) { + final UnwrapDataEntry entry1 = (UnwrapDataEntry)entry; + + sb.append(entry1.op()).append(" for ").append(entry1.unwrappedKey()); + final GridCacheVersion ver = entry.nearXidVersion(); + + sb.append(", "); - builder.append("}\n"); + if (ver != null) + sb.append("tx=").append(ver).append(", "); + } } + + sb.append("}\n"); }; - scanIterateAndCount(factory, workDir, subfolderName, 1, 1, null, drHnd); - scanIterateAndCount(factory, workDir, subfolderName1, 1, 1, null, drHnd); - final Integer createsFound = operationsFound.get(CREATE); + scanIterateAndCount( + factory, + createIteratorParametersBuilder(workDir, subfolderName1) + .filesOrDirs( + workDir + "/db/wal/" + subfolderName1, + workDir + "/db/wal/archive/" + subfolderName1 + ), + 1, + 1, + null, drHnd + ); + + scanIterateAndCount( + factory, + createIteratorParametersBuilder(workDir, subfolderName2) + .filesOrDirs( + workDir + "/db/wal/" + subfolderName2, + workDir + "/db/wal/archive/" + subfolderName2 + ), + 1, + 1, + null, + drHnd + ); + + Integer createsFound = operationsFound.get(CREATE); if (log.isInfoEnabled()) - log.info(builder.toString()); + log.info(sb.toString()); assertTrue("Create operations should be found in log: " + operationsFound, createsFound != null && createsFound > 0); assertTrue("Create operations count should be at least " + cntEntries + " in log: " + operationsFound, createsFound != null && createsFound >= cntEntries); - } /** * Tests transaction generation and WAL for putAll cache operation. + * * @throws Exception if failed. */ public void testTxRecordsReadWoBinaryMeta() throws Exception { clearProperties = true; + System.setProperty(IgniteSystemProperties.IGNITE_WAL_LOG_TX_RECORDS, "true"); - final Ignite ignite = startGrid("node0"); - ignite.active(true); + Ignite ignite = startGrid("node0"); - final Map<Object, IndexedObject> map = new TreeMap<>(); + ignite.cluster().active(true); + + Map<Object, IndexedObject> map = new TreeMap<>(); for (int i = 0; i < 1000; i++) map.put(i, new IndexedObject(i)); ignite.cache(CACHE_NAME).putAll(map); - ignite.active(false); + ignite.cluster().active(false); + + String workDir = U.defaultWorkDirectory(); + + String subfolderName = genDbSubfolderName(ignite, 0); - final String workDir = U.defaultWorkDirectory(); - final String subfolderName = genDbSubfolderName(ignite, 0); stopAllGrids(); - IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory(new NullLogger(), - PAGE_SIZE, - null, - null, - false); + IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory(new NullLogger()); + + IteratorParametersBuilder params = createIteratorParametersBuilder(workDir, subfolderName); - scanIterateAndCount(factory, workDir, subfolderName, 1000, 1, null, null); + scanIterateAndCount( + factory, + params.filesOrDirs(workDir), + 1000, + 1, + null, + null + ); } /** @@ -1039,19 +1053,17 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest { * @return WAL iterator factory. * @throws IgniteCheckedException If failed. */ - @NotNull private IgniteWalIteratorFactory createWalIteratorFactory( - final String workDir, - final String subfolderName + @NotNull private IteratorParametersBuilder createIteratorParametersBuilder( + String workDir, + String subfolderName ) throws IgniteCheckedException { - final File binaryMeta = U.resolveWorkDirectory(workDir, "binary_meta", false); - final File binaryMetaWithConsId = new File(binaryMeta, subfolderName); - final File marshallerMapping = U.resolveWorkDirectory(workDir, "marshaller", false); - - return new IgniteWalIteratorFactory(log, - PAGE_SIZE, - binaryMetaWithConsId, - marshallerMapping, - false); + File binaryMeta = U.resolveWorkDirectory(workDir, "binary_meta", false); + File binaryMetaWithConsId = new File(binaryMeta, subfolderName); + File marshallerMapping = U.resolveWorkDirectory(workDir, "marshaller", false); + + return new IteratorParametersBuilder() + .binaryMetadataFileStoreDir(binaryMetaWithConsId) + .marshallerMappingFileStoreDir(marshallerMapping); } /** @@ -1075,28 +1087,33 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest { * @throws IgniteCheckedException if failure. */ private Map<GridCacheVersion, Integer> iterateAndCountDataRecord( - final WALIterator walIter, - @Nullable final IgniteBiInClosure<Object, Object> cacheObjHnd, - @Nullable final IgniteInClosure<DataRecord> dataRecordHnd) throws IgniteCheckedException { + WALIterator walIter, + @Nullable IgniteBiInClosure<Object, Object> cacheObjHnd, + @Nullable IgniteInClosure<DataRecord> dataRecordHnd + ) throws IgniteCheckedException { - final Map<GridCacheVersion, Integer> entriesUnderTxFound = new HashMap<>(); + Map<GridCacheVersion, Integer> entriesUnderTxFound = new HashMap<>(); try (WALIterator stIt = walIter) { while (stIt.hasNextX()) { - final IgniteBiTuple<WALPointer, WALRecord> next = stIt.nextX(); - final WALRecord walRecord = next.get2(); + IgniteBiTuple<WALPointer, WALRecord> tup = stIt.nextX(); - if (walRecord.type() == WALRecord.RecordType.DATA_RECORD && walRecord instanceof DataRecord) { - final DataRecord dataRecord = (DataRecord)walRecord; + WALRecord walRecord = tup.get2(); + + if (walRecord.type() == DATA_RECORD && walRecord instanceof DataRecord) { + DataRecord dataRecord = (DataRecord)walRecord; if (dataRecordHnd != null) dataRecordHnd.apply(dataRecord); - final List<DataEntry> entries = dataRecord.writeEntries(); + + List<DataEntry> entries = dataRecord.writeEntries(); for (DataEntry entry : entries) { - final GridCacheVersion globalTxId = entry.nearXidVersion(); + GridCacheVersion globalTxId = entry.nearXidVersion(); + Object unwrappedKeyObj; Object unwrappedValObj; + if (entry instanceof UnwrapDataEntry) { UnwrapDataEntry unwrapDataEntry = (UnwrapDataEntry)entry; unwrappedKeyObj = unwrapDataEntry.unwrappedKey(); @@ -1117,7 +1134,7 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest { unwrappedKeyObj = key instanceof BinaryObject ? key : key.value(null, false); } - if (dumpRecords) + if (DUMP_RECORDS) log.info("//Entry operation " + entry.op() + "; cache Id" + entry.cacheId() + "; " + "under transaction: " + globalTxId + //; entry " + entry + @@ -1127,23 +1144,78 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest { if (cacheObjHnd != null && (unwrappedKeyObj != null || unwrappedValObj != null)) cacheObjHnd.apply(unwrappedKeyObj, unwrappedValObj); - final Integer entriesUnderTx = entriesUnderTxFound.get(globalTxId); + Integer entriesUnderTx = entriesUnderTxFound.get(globalTxId); + entriesUnderTxFound.put(globalTxId, entriesUnderTx == null ? 1 : entriesUnderTx + 1); } } - else if (walRecord.type() == WALRecord.RecordType.TX_RECORD && walRecord instanceof TxRecord) { - final TxRecord txRecord = (TxRecord)walRecord; - final GridCacheVersion globalTxId = txRecord.nearXidVersion(); + else if (walRecord.type() == TX_RECORD && walRecord instanceof TxRecord) { + TxRecord txRecord = (TxRecord)walRecord; + GridCacheVersion globalTxId = txRecord.nearXidVersion(); - if (dumpRecords) + if (DUMP_RECORDS) log.info("//Tx Record, state: " + txRecord.state() + "; nearTxVersion" + globalTxId); } } } + return entriesUnderTxFound; } + /** + * Puts provided number of records to fill WAL. + * + * @param ignite ignite instance. + * @param recordsToWrite count. + */ + private void putDummyRecords(Ignite ignite, int recordsToWrite) { + IgniteCache<Object, Object> cache0 = ignite.cache(CACHE_NAME); + + for (int i = 0; i < recordsToWrite; i++) + cache0.put(i, new IndexedObject(i)); + } + + /** + * Puts provided number of records to fill WAL. + * + * @param ignite ignite instance. + * @param recordsToWrite count. + */ + private void putAllDummyRecords(Ignite ignite, int recordsToWrite) { + IgniteCache<Object, Object> cache0 = ignite.cache(CACHE_NAME); + + Map<Object, Object> values = new HashMap<>(); + + for (int i = 0; i < recordsToWrite; i++) + values.put(i, new IndexedObject(i)); + + cache0.putAll(values); + } + + /** + * Puts provided number of records to fill WAL under transactions. + * + * @param ignite ignite instance. + * @param recordsToWrite count. + * @param txCnt transactions to run. If number is less then records count, txCnt records will be written. + */ + private IgniteCache<Object, Object> txPutDummyRecords(Ignite ignite, int recordsToWrite, int txCnt) { + IgniteCache<Object, Object> cache0 = ignite.cache(CACHE_NAME); + int keysPerTx = recordsToWrite / txCnt; + if (keysPerTx == 0) + keysPerTx = 1; + for (int t = 0; t < txCnt; t++) { + try (Transaction tx = ignite.transactions().txStart()) { + for (int i = t * keysPerTx; i < (t + 1) * keysPerTx; i++) + cache0.put(i, new IndexedObject(i)); + + tx.commit(); + } + } + return cache0; + } + /** Enum for cover binaryObject enum save/load. */ enum TestEnum { /** */A, /** */B, /** */C @@ -1258,7 +1330,7 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest { * * @param data value to be searched in to String */ - public TestStringContainerToBePrinted(String data) { + TestStringContainerToBePrinted(String data) { this.data = data; } @@ -1298,7 +1370,7 @@ public class IgniteWalReaderTest extends GridCommonAbstractTest { * @param key Key. * @param name Name. */ - public Organization(int key, String name) { + Organization(int key, String name) { this.key = key; this.name = name; } http://git-wip-us.apache.org/repos/asf/ignite/blob/d6ab2ae6/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java index 0188445..0a240ea 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/NoOpWALManager.java @@ -150,4 +150,9 @@ public class NoOpWALManager implements IgniteWriteAheadLogManager { @Override public int walArchiveSegments() { return 0; } + + /** {@inheritDoc} */ + @Override public long lastArchivedSegment() { + return -1L; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d6ab2ae6/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java index 3b46761..1b7111a 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java @@ -52,6 +52,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalF import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalFlushLogOnlyWithMmapBufferSelfTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalFormatFileFailoverTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalHistoryReservationsTest; +import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalIteratorExceptionDuringReadTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalIteratorSwitchSegmentTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalSerializerVersionTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompactionTest; @@ -174,5 +175,7 @@ public class IgnitePdsTestSuite2 extends TestSuite { suite.addTestSuite(IgnitePdsCorruptedCacheDataTest.class); suite.addTestSuite(IgniteWalIteratorSwitchSegmentTest.class); + + suite.addTestSuite(IgniteWalIteratorExceptionDuringReadTest.class); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d6ab2ae6/modules/dev-utils/src/main/java/org/apache/ignite/development/utils/IgniteWalConverter.java ---------------------------------------------------------------------- diff --git a/modules/dev-utils/src/main/java/org/apache/ignite/development/utils/IgniteWalConverter.java b/modules/dev-utils/src/main/java/org/apache/ignite/development/utils/IgniteWalConverter.java index 2da1aa3..eee193a 100644 --- a/modules/dev-utils/src/main/java/org/apache/ignite/development/utils/IgniteWalConverter.java +++ b/modules/dev-utils/src/main/java/org/apache/ignite/development/utils/IgniteWalConverter.java @@ -54,11 +54,7 @@ public class IgniteWalConverter { boolean printRecords = IgniteSystemProperties.getBoolean("PRINT_RECORDS", false); boolean printStat = IgniteSystemProperties.getBoolean("PRINT_STAT", true); - final IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory(new NullLogger(), - Integer.parseInt(args[0]), - null, - null, - false); + final IgniteWalIteratorFactory factory = new IgniteWalIteratorFactory(new NullLogger()); final File walWorkDirWithConsistentId = new File(args[1]); @@ -69,7 +65,7 @@ public class IgniteWalConverter { @Nullable final WalStat stat = printStat ? new WalStat() : null; - try (WALIterator stIt = factory.iteratorWorkFiles(workFiles)) { + try (WALIterator stIt = factory.iterator(workFiles)) { while (stIt.hasNextX()) { IgniteBiTuple<WALPointer, WALRecord> next = stIt.nextX(); @@ -87,7 +83,7 @@ public class IgniteWalConverter { if (args.length >= 3) { final File walArchiveDirWithConsistentId = new File(args[2]); - try (WALIterator stIt = factory.iteratorArchiveDirectory(walArchiveDirWithConsistentId)) { + try (WALIterator stIt = factory.iterator(walArchiveDirWithConsistentId)) { while (stIt.hasNextX()) { IgniteBiTuple<WALPointer, WALRecord> next = stIt.nextX();