This is an automated email from the ASF dual-hosted git repository. sergeychugunov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new defc398 IGNITE-13877 Restructuring WAL work directory after enabling WAL archive. - Fixes #8681. defc398 is described below commit defc39832e53b4032ba2883fbdaed77228c99ac2 Author: Kirill Tkalenko <tkalkir...@yandex.ru> AuthorDate: Wed Feb 10 09:48:14 2021 +0300 IGNITE-13877 Restructuring WAL work directory after enabling WAL archive. - Fixes #8681. Signed-off-by: Sergey Chugunov <sergey.chugu...@gmail.com> --- .../persistence/wal/FileWriteAheadLogManager.java | 228 +++++++++++++--- .../persistence/wal/WalArchiveConsistencyTest.java | 294 +++++++++++++++++++++ .../testframework/junits/GridAbstractTest.java | 16 ++ .../ignite/testsuites/IgnitePdsTestSuite.java | 3 + 4 files changed, 508 insertions(+), 33 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index a7eee3c..a22ef7a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -50,7 +50,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongArray; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.regex.Pattern; -import java.util.stream.Collectors; import java.util.zip.ZipEntry; import java.util.zip.ZipInputStream; import java.util.zip.ZipOutputStream; @@ -127,9 +126,12 @@ import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.thread.IgniteThread; import org.jetbrains.annotations.Nullable; +import static java.nio.file.StandardCopyOption.ATOMIC_MOVE; +import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; import static java.nio.file.StandardOpenOption.CREATE; import static java.nio.file.StandardOpenOption.READ; import static java.nio.file.StandardOpenOption.WRITE; +import static java.util.stream.Collectors.toList; import static org.apache.ignite.IgniteSystemProperties.IGNITE_CHECKPOINT_TRIGGER_ARCHIVE_SIZE_PERCENTAGE; import static org.apache.ignite.IgniteSystemProperties.IGNITE_THRESHOLD_WAIT_TIME_NEXT_WAL_SEGMENT; import static org.apache.ignite.IgniteSystemProperties.IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE; @@ -216,13 +218,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl private final boolean mmap = IgniteSystemProperties.getBoolean(IGNITE_WAL_MMAP, DFLT_WAL_MMAP); /** - * Percentage of WAL archive size to calculate threshold since which removing of old archive should be started. - */ - private static final double THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE = - IgniteSystemProperties.getDouble(IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE, - DFLT_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE); - - /** * Number of WAL compressor worker threads. */ private final int WAL_COMPRESSOR_WORKER_THREAD_CNT = @@ -407,7 +402,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl segmentFileInputFactory = new SimpleSegmentFileInputFactory(); walAutoArchiveAfterInactivity = dsCfg.getWalAutoArchiveAfterInactivity(); - allowedThresholdWalArchiveSize = (long)(dsCfg.getMaxWalArchiveSize() * THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE); + double thresholdWalArchiveSizePercentage = IgniteSystemProperties.getDouble( + IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE, DFLT_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE); + + allowedThresholdWalArchiveSize = (long)(dsCfg.getMaxWalArchiveSize() * thresholdWalArchiveSizePercentage); evt = ctx.event(); failureProcessor = ctx.failure(); @@ -462,8 +460,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl metrics = dbMgr.persistentStoreMetricsImpl(); - checkOrPrepareFiles(); - if (metrics != null) { metrics.setWalSizeProvider(new CO<Long>() { /** {@inheritDoc} */ @@ -492,11 +488,19 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } if (isArchiverEnabled()) - archiver = new FileArchiver(segmentAware, log); + archiver = new FileArchiver(log); if (!walArchiveUnlimited()) cleaner = new FileCleaner(log); + prepareAndCheckWalFiles(); + + if (compressor != null) + compressor.initAlreadyCompressedSegments(); + + if (archiver != null) + archiver.init(segmentAware); + segmentRouter = new SegmentRouter(walWorkDir, walArchiveDir, segmentAware, dsCfg); fileHandleManager = fileHandleManagerFactory.build( @@ -1502,11 +1506,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** - * Deletes temp files creates and prepares new; Creates the first segment if necessary. + * Prepare and check WAL files. * * @throws StorageException If failed. */ - private void checkOrPrepareFiles() throws StorageException { + private void prepareAndCheckWalFiles() throws StorageException { Collection<File> tmpFiles = new HashSet<>(); for (File walDir : F.asList(walWorkDir, walArchiveDir)) { @@ -1521,21 +1525,18 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } } - File[] allFiles = walWorkDir.listFiles(WAL_SEGMENT_FILE_FILTER); + if (F.isEmpty(walWorkDir.listFiles(WAL_SEGMENT_FILE_FILTER))) + createFile(new File(walWorkDir, fileName(0))); - if (isArchiverEnabled() && !F.isEmpty(allFiles) && allFiles.length > dsCfg.getWalSegments()) { - throw new StorageException("Failed to initialize wal (work directory contains incorrect " + - "number of segments) [cur=" + allFiles.length + ", expected=" + dsCfg.getWalSegments() + ']'); - } + if (isArchiverEnabled()) { + moveSegmentsToArchive(); - // Allocate the first segment synchronously. All other segments will be allocated by archiver in background. - if (F.isEmpty(allFiles)) { - File first = new File(walWorkDir, fileName(0)); + renameLastSegment(); + + formatWorkSegments(); - createFile(first); - } - else if (isArchiverEnabled()) checkFiles(0, false, null, null); + } } /** @@ -1724,14 +1725,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** * Constructor. * - * @param segmentAware Segment aware. * @param log Logger. */ - private FileArchiver(SegmentAware segmentAware, IgniteLogger log) throws IgniteCheckedException { + private FileArchiver(IgniteLogger log) { super(cctx.igniteInstanceName(), "wal-file-archiver%" + cctx.igniteInstanceName(), log, cctx.kernalContext().workersRegistry()); - - init(segmentAware); } /** @@ -2048,7 +2046,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** * Background creation of all segments except first. First segment was created in main thread by {@link - * FileWriteAheadLogManager#checkOrPrepareFiles()} + * FileWriteAheadLogManager#prepareAndCheckWalFiles()} */ private void allocateRemainingFiles() throws StorageException { checkFiles( @@ -2092,8 +2090,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl */ FileCompressor(IgniteLogger log) { super(0, log); - - initAlreadyCompressedSegments(); } /** */ @@ -2936,7 +2932,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (files == null) return Collections.emptyList(); - return Arrays.stream(files).map(File::getName).sorted().collect(Collectors.toList()); + return Arrays.stream(files).map(File::getName).sorted().collect(toList()); } /** {@inheritDoc} */ @@ -3252,4 +3248,170 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl new IgniteThread(this).start(); } } + + /** + * Moving working segments to archive, if segments are more than {@link DataStorageConfiguration#getWalSegments()} + * or index of first segment is not 0. All segments will be moved except for last one, + * as well as all compressed segments. + * + * @throws StorageException If an error occurs while moving. + */ + private void moveSegmentsToArchive() throws StorageException { + assert isArchiverEnabled(); + + FileDescriptor[] workSegments = scan(walWorkDir.listFiles(WAL_SEGMENT_FILE_FILTER)); + + List<FileDescriptor> toMove = new ArrayList<>(); + + if (!F.isEmpty(workSegments) && (workSegments.length > dsCfg.getWalSegments() || workSegments[0].idx() != 0)) + toMove.addAll(F.asList(workSegments).subList(0, workSegments.length - 1)); + + toMove.addAll(F.asList(scan(walWorkDir.listFiles(WAL_SEGMENT_FILE_COMPACTED_FILTER)))); + + if (!toMove.isEmpty()) { + log.warning("Content of WAL working directory needs rearrangement, some WAL segments will be moved to " + + "archive: " + walArchiveDir.getAbsolutePath() + ". Segments from " + toMove.get(0).file().getName() + + " to " + toMove.get(toMove.size() - 1).file().getName() + " will be moved, total number of files: " + + toMove.size() + ". This operation may take some time."); + + for (int i = 0, j = 0; i < toMove.size(); i++) { + FileDescriptor fd = toMove.get(i); + + File tmpDst = new File(walArchiveDir, fd.file().getName() + TMP_SUFFIX); + File dst = new File(walArchiveDir, fd.file().getName()); + + try { + Files.copy(fd.file().toPath(), tmpDst.toPath()); + + Files.move(tmpDst.toPath(), dst.toPath()); + + Files.delete(fd.file().toPath()); + + if (log.isDebugEnabled()) { + log.debug("WAL segment moved [src=" + fd.file().getAbsolutePath() + + ", dst=" + dst.getAbsolutePath() + ']'); + } + + // Batch output. + if (log.isInfoEnabled() && (i == toMove.size() - 1 || (i != 0 && i % 9 == 0))) { + log.info("WAL segments moved: " + toMove.get(j).file().getName() + + (i == j ? "" : " - " + toMove.get(i).file().getName())); + + j = i + 1; + } + } + catch (IOException e) { + throw new StorageException("Failed to move WAL segment [src=" + fd.file().getAbsolutePath() + + ", dst=" + dst.getAbsolutePath() + ']', e); + } + } + } + } + + /** + * Renaming last segment if it is only one and its index is greater than {@link DataStorageConfiguration#getWalSegments()}. + * + * @throws StorageException If an error occurs while renaming. + */ + private void renameLastSegment() throws StorageException { + assert isArchiverEnabled(); + + FileDescriptor[] workSegments = scan(walWorkDir.listFiles(WAL_SEGMENT_FILE_FILTER)); + + if (workSegments.length == 1 && workSegments[0].idx() != workSegments[0].idx() % dsCfg.getWalSegments()) { + FileDescriptor toRen = workSegments[0]; + + if (log.isInfoEnabled()) { + log.info("Last WAL segment file has to be renamed from " + toRen.file().getName() + " to " + + fileName(toRen.idx() % dsCfg.getWalSegments()) + '.'); + } + + String toRenFileName = fileName(toRen.idx() % dsCfg.getWalSegments()); + + File tmpDst = new File(walWorkDir, toRenFileName + TMP_SUFFIX); + File dst = new File(walWorkDir, toRenFileName); + + try { + Files.copy(toRen.file().toPath(), tmpDst.toPath()); + + Files.move(tmpDst.toPath(), dst.toPath()); + + Files.delete(toRen.file().toPath()); + + if (log.isInfoEnabled()) { + log.info("WAL segment renamed [src=" + toRen.file().getAbsolutePath() + + ", dst=" + dst.getAbsolutePath() + ']'); + } + } + catch (IOException e) { + throw new StorageException("Failed to rename WAL segment [src=" + + toRen.file().getAbsolutePath() + ", dst=" + dst.getAbsolutePath() + ']', e); + } + } + } + + /** + * Formatting working segments to {@link DataStorageConfiguration#getWalSegmentSize()} for work in a mmap or fsync case. + * + * @throws StorageException If an error occurs when formatting. + */ + private void formatWorkSegments() throws StorageException { + assert isArchiverEnabled(); + + if (mode == WALMode.FSYNC || mmap) { + List<FileDescriptor> toFormat = Arrays.stream(scan(walWorkDir.listFiles(WAL_SEGMENT_FILE_FILTER))) + .filter(fd -> fd.file().length() < dsCfg.getWalSegmentSize()).collect(toList()); + + if (!toFormat.isEmpty()) { + if (log.isInfoEnabled()) { + log.info("WAL segments in working directory should have the same size: '" + + U.humanReadableByteCount(dsCfg.getWalSegmentSize()) + "'. Segments that need reformat " + + "found: " + F.viewReadOnly(toFormat, fd -> fd.file().getName()) + '.'); + } + + for (int i = 0, j = 0; i < toFormat.size(); i++) { + FileDescriptor fd = toFormat.get(i); + + File tmpDst = new File(fd.file().getName() + TMP_SUFFIX); + + try { + Files.copy(fd.file().toPath(), tmpDst.toPath()); + + if (log.isDebugEnabled()) { + log.debug("Start formatting WAL segment [filePath=" + tmpDst.getAbsolutePath() + + ", fileSize=" + U.humanReadableByteCount(tmpDst.length()) + + ", toSize=" + U.humanReadableByteCount(dsCfg.getWalSegmentSize()) + ']'); + } + + try (FileIO fileIO = ioFactory.create(tmpDst, CREATE, READ, WRITE)) { + int left = (int)(dsCfg.getWalSegmentSize() - tmpDst.length()); + + fileIO.position(tmpDst.length()); + + while (left > 0) + left -= fileIO.writeFully(FILL_BUF, 0, Math.min(FILL_BUF.length, left)); + + fileIO.force(); + } + + Files.move(tmpDst.toPath(), fd.file().toPath(), REPLACE_EXISTING, ATOMIC_MOVE); + + if (log.isDebugEnabled()) + log.debug("WAL segment formatted: " + fd.file().getAbsolutePath()); + + // Batch output. + if (log.isInfoEnabled() && (i == toFormat.size() - 1 || (i != 0 && i % 9 == 0))) { + log.info("WAL segments formatted: " + toFormat.get(j).file().getName() + + (i == j ? "" : " - " + fileName(i))); + + j = i + 1; + } + } + catch (IOException e) { + throw new StorageException("Failed to format WAL segment: " + fd.file().getAbsolutePath(), e); + } + } + } + } + } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalArchiveConsistencyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalArchiveConsistencyTest.java new file mode 100644 index 0000000..43a26c2 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/wal/WalArchiveConsistencyTest.java @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.wal; + +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.WALMode; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.junits.WithSystemProperty; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE; +import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_WAL_PATH; +import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; + +/** + * Class for testing cases when WAL archive configuration was changed and the node was able to start. + */ +@RunWith(Parameterized.class) +@WithSystemProperty(key = IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE, value = "0.0") +public class WalArchiveConsistencyTest extends GridCommonAbstractTest { + /** + * WAL mode. + */ + @Parameterized.Parameter + public WALMode walMode; + + /** + * @return Test parameters. + */ + @Parameterized.Parameters(name = "walMode={0}") + public static Collection<Object[]> parameters() { + return Arrays.asList( + new Object[] {WALMode.LOG_ONLY}, + new Object[] {WALMode.FSYNC} + ); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + stopAllGrids(); + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + return super.getConfiguration(igniteInstanceName) + .setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME)) + .setDataStorageConfiguration( + new DataStorageConfiguration() + .setWalSegments(10) + .setWalSegmentSize((int)U.MB) + .setMaxWalArchiveSize(10 * U.MB) + .setWalMode(walMode) + .setWalFsyncDelayNanos(100) + .setDefaultDataRegionConfiguration( + new DataRegionConfiguration() + .setPersistenceEnabled(true) + .setMaxSize(U.GB) + ) + ); + } + + /** {@inheritDoc} */ + @Override protected IgniteEx startGrid(int idx, Consumer<IgniteConfiguration> cfgOp) throws Exception { + IgniteEx n = super.startGrid(idx, cfgOp); + + n.cluster().state(ClusterState.ACTIVE); + awaitPartitionMapExchange(); + + return n; + } + + /** + * Verify that when switching WAL archive off -> on and increasing the + * number of WAL segments on restarting the node, the recovery will be consistent. + * + * @throws Exception If failed. + */ + @Test + public void testIncreaseWalSegmentsWithoutTruncate() throws Exception { + checkRecoveryWithoutWalTruncate(12); + } + + /** + * Verify that when switching WAL archive off -> on and decreasing the + * number of WAL segments on restarting the node, the recovery will be consistent. + * + * @throws Exception If failed. + */ + @Test + public void testDecreaseWalSegmentsWithoutTruncate() throws Exception { + checkRecoveryWithoutWalTruncate(4); + } + + /** + * Checking that when switching WAL archive off -> on, + * reducing WAL segments at the start of the node + * and truncation some WAL segments, the recovery will be consistent. + * + * @throws Exception If failed. + */ + @Test + public void testDecreaseWalSegmentsWithTruncate0() throws Exception { + checkRecoveryWithWalTruncate(5); + } + + /** + * Checking that when switching WAL archive off -> on, + * reducing WAL segments at the start of the node + * and truncation some WAL segments, the recovery will be consistent. + * + * @throws Exception If failed. + */ + @Test + public void testDecreaseWalSegmentsWithTruncate1() throws Exception { + checkRecoveryWithWalTruncate(6); + } + + /** + * Checking that when switching WAL archive off -> on + * and truncation some WAL segments, the recovery will be consistent. + * + * @throws Exception If failed. + */ + @Test + public void testNotChangeWalSegmentsWithTruncate() throws Exception { + checkRecoveryWithWalTruncate(10); + } + + /** + * Checking the consistency of recovery from a WAL when switching + * WAL archive off -> on and changing the number of segments on node restart. + * With truncate WAL segments. + * + * @param segments Segment count on node restart. + * @throws Exception If failed. + */ + private void checkRecoveryWithWalTruncate(int segments) throws Exception { + IgniteEx n = startGrid(0, cfg -> { + cfg.getDataStorageConfiguration().setWalArchivePath(DFLT_WAL_PATH); + }); + + AtomicInteger key = new AtomicInteger(); + + dbMgr(n).checkpointReadLock(); + + try { + fill(n, 6, key); + + // Protection against deleting WAL segments. + assertTrue(walMgr(n).reserve(new WALPointer(5, 0, 0))); + } + finally { + dbMgr(n).checkpointReadUnlock(); + } + + forceCheckpoint(); + assertTrue(waitForCondition(() -> walMgr(n).lastTruncatedSegment() == 4, getTestTimeout())); + + // Guaranteed recovery from WAL segments. + dbMgr(n).enableCheckpoints(false).get(getTestTimeout()); + + fill(n, 2, key); + + stopAllGrids(); + + IgniteEx n0 = startGrid(0, cfg -> { + cfg.getDataStorageConfiguration().setWalSegments(segments); + }); + + assertEquals(key.get(), n0.cache(DEFAULT_CACHE_NAME).size()); + } + + /** + * Checking the consistency of recovery from a WAL when switching + * WAL archive off -> on and changing the number of segments on node restart. + * Without truncate WAL segments. + * + * @param segments Segment count on node restart. + * @throws Exception If failed. + */ + private void checkRecoveryWithoutWalTruncate(int segments) throws Exception { + IgniteEx n = startGrid(0, cfg -> { + cfg.getDataStorageConfiguration().setWalArchivePath(DFLT_WAL_PATH); + }); + + // Protection against deleting WAL segments. + assertTrue(walMgr(n).reserve(new WALPointer(0, 0, 0))); + + AtomicInteger key = new AtomicInteger(); + + fill(n, 3, key); + forceCheckpoint(); + + // Guaranteed recovery from WAL segments. + dbMgr(n).enableCheckpoints(false).get(getTestTimeout()); + + fill(n, 3, key); + + stopAllGrids(); + + n = startGrid(0, cfg -> { + cfg.getDataStorageConfiguration().setWalSegments(segments); + }); + + assertEquals(key.get(), n.cache(DEFAULT_CACHE_NAME).size()); + } + + /** + * Filling the cache until N WAL segments are created. + * + * @param n Node. + * @param segments Number of segments. + * @param key Key counter. + */ + private void fill(IgniteEx n, int segments, AtomicInteger key) { + long end = walMgr(n).currentSegment() + segments; + int i = 0; + + while (walMgr(n).currentSegment() < end) { + int k = key.getAndIncrement(); + int[] arr = new int[64]; + + Arrays.fill(arr, k); + + n.cache(DEFAULT_CACHE_NAME).put(key, arr); + + i++; + } + + if (log.isInfoEnabled()) { + log.info("Fill [keys=" + i + ", totalKeys=" + key.get() + + ", segNum=" + segments + ", currSeg=" + walMgr(n).currentSegment() + ']'); + } + } + + /** + * Getting WAL manager of node. + * + * @param n Node. + * @return WAL manager. + */ + private FileWriteAheadLogManager walMgr(IgniteEx n) { + return (FileWriteAheadLogManager)n.context().cache().context().wal(); + } + + /** + * Getting db manager of node. + * + * @param n Node. + * @return Db manager. + */ + private GridCacheDatabaseSharedManager dbMgr(IgniteEx n) { + return (GridCacheDatabaseSharedManager)n.context().cache().context().database(); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java index d286c126..a0a8830 100755 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java @@ -1038,6 +1038,22 @@ public abstract class GridAbstractTest extends JUnitAssertAware { * Starts new grid with given index. * * @param idx Index of the grid to start. + * @param cfgC Configuration mutator. Can be used to avoid oversimplification of {@link #getConfiguration()}. + * @return Started grid. + * @throws Exception If anything failed. + */ + protected IgniteEx startGrid(int idx, Consumer<IgniteConfiguration> cfgC) throws Exception { + return startGrid(getTestIgniteInstanceName(idx), cfg -> { + cfgC.accept(cfg); + + return cfg; + }); + } + + /** + * Starts new grid with given index. + * + * @param idx Index of the grid to start. * @param cfgOp Configuration mutator. Can be used to avoid overcomplification of {@link #getConfiguration()}. * @return Started grid. * @throws Exception If anything failed. diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java index ad60059..75aaed9 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite.java @@ -63,6 +63,7 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.CpTriggeredWa import org.apache.ignite.internal.processors.cache.persistence.wal.ExplicitWalDeltaConsistencyTest; import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBufferTest; import org.apache.ignite.internal.processors.cache.persistence.wal.SysPropWalDeltaConsistencyTest; +import org.apache.ignite.internal.processors.cache.persistence.wal.WalArchiveConsistencyTest; import org.apache.ignite.internal.processors.cache.persistence.wal.WalEnableDisableWithNodeShutdownTest; import org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentAwareTest; import org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationPersistentTest; @@ -137,6 +138,8 @@ public class IgnitePdsTestSuite { GridTestUtils.addTestIfNeeded(suite, SegmentAwareTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, WalArchiveConsistencyTest.class, ignoredTests); + return suite; }