This is an automated email from the ASF dual-hosted git repository. sergeychugunov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 5223e44 IGNITE-13910 Missing segment is not released - Fixes #8612. 5223e44 is described below commit 5223e4437f60dafc09b5a90fb2e6733640783c04 Author: Kirill Tkalenko <tkalkir...@yandex.ru> AuthorDate: Fri Dec 25 15:33:20 2020 +0300 IGNITE-13910 Missing segment is not released - Fixes #8612. Signed-off-by: Sergey Chugunov <sergey.chugu...@gmail.com> --- .../persistence/wal/FileWriteAheadLogManager.java | 30 ++-- .../apache/ignite/internal/util/IgniteUtils.java | 3 + .../db/wal/WriteAheadLogManagerSelfTest.java | 175 +++++++++++++++++++++ .../testframework/junits/GridAbstractTest.java | 2 +- .../ignite/testsuites/IgnitePdsTestSuite2.java | 3 + 5 files changed, 198 insertions(+), 15 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index 4b97487..4a1d968 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -584,18 +584,21 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** - * Collect wal segment files from low pointer (include) to high pointer (not include) and reserve low pointer. + * Collects WAL segments from the archive only if they are all present. + * Will wait for the last segment to be archived if it is not. + * If there are missing segments an empty collection is returned. * - * @param low Low bound. - * @param high High bound. + * @param low Low bound (include). + * @param high High bound (not include). + * @return WAL segments from the archive, or an empty collection if at + * least a segment between {@code low} and {@code high} is missing. + * @throws IgniteCheckedException If failed. */ - public Collection<File> getAndReserveWalFiles(WALPointer low, WALPointer high) throws IgniteCheckedException { - final long awaitIdx = high.index() - 1; - - segmentAware.awaitSegmentArchived(awaitIdx); - - if (!reserve(low)) - throw new IgniteCheckedException("WAL archive segment has been deleted [idx=" + low.index() + "]"); + public Collection<File> getWalFilesFromArchive( + WALPointer low, + WALPointer high + ) throws IgniteCheckedException { + segmentAware.awaitSegmentArchived(high.index() - 1); List<File> res = new ArrayList<>(); @@ -610,11 +613,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl else if (fileZip.exists()) res.add(fileZip); else { - if (log.isInfoEnabled()) { + if (log.isInfoEnabled()) log.info("Segment not found: " + file.getName() + "/" + fileZip.getName()); - log.info("Stopped iteration on idx: " + i); - } + res.clear(); break; } @@ -1008,7 +1010,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl // Segment presence check. if (reserved && !hasIndex(start.index())) { - segmentAware.reserve(start.index()); + segmentAware.release(start.index()); reserved = false; } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index 5f5aafe..5d5c360 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -308,6 +308,9 @@ import static org.apache.ignite.internal.util.GridUnsafe.staticFieldOffset; @SuppressWarnings({"UnusedReturnValue"}) public abstract class IgniteUtils { /** */ + public static final long KB = 1024L; + + /** */ public static final long MB = 1024L * 1024; /** */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WriteAheadLogManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WriteAheadLogManagerSelfTest.java new file mode 100644 index 0000000..e748d88 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WriteAheadLogManagerSelfTest.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.db.wal; + +import java.io.File; +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; +import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +import static org.apache.ignite.testframework.GridTestUtils.assertThrows; +import static org.apache.ignite.testframework.GridTestUtils.runAsync; + +/** + * Class for testing WAL manager. + */ +public class WriteAheadLogManagerSelfTest extends GridCommonAbstractTest { + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + stopAllGrids(); + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + cleanPersistenceDir(); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + return super.getConfiguration(igniteInstanceName) + .setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME)) + .setDataStorageConfiguration( + new DataStorageConfiguration() + .setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true)) + ); + } + + /** {@inheritDoc} */ + @Override protected IgniteEx startGrids(int cnt) throws Exception { + IgniteEx n = super.startGrids(cnt); + + n.cluster().state(ClusterState.ACTIVE); + awaitPartitionMapExchange(); + + return n; + } + + /** + * Checking the correctness of WAL segment reservation. + * + * @throws Exception If failed. + */ + @Test + public void testReservation() throws Exception { + IgniteEx n = startGrids(1); + + for (int i = 0; walMgr(n).lastArchivedSegment() < 2; i++) + n.cache(DEFAULT_CACHE_NAME).put(i, new byte[(int)(10 * U.KB)]); + + forceCheckpoint(); + + assertTrue(walMgr(n).lastArchivedSegment() >= 2); + assertTrue(walMgr(n).lastTruncatedSegment() == -1); + + WALPointer segment0WalPtr = new WALPointer(0, 0, 0); + assertTrue(walMgr(n).reserve(segment0WalPtr)); + assertTrue(walMgr(n).reserved(segment0WalPtr)); + + WALPointer segment1WalPtr = new WALPointer(1, 0, 0); + + // Delete segment manually. + FileDescriptor segment1 = Arrays.stream(walMgr(n).walArchiveFiles()) + .filter(fd -> fd.idx() == segment1WalPtr.index()).findAny().orElseThrow(AssertionError::new); + + assertTrue(segment1.file().delete()); + + assertFalse(walMgr(n).reserve(segment1WalPtr)); + assertTrue(walMgr(n).reserved(segment1WalPtr)); + + walMgr(n).release(segment0WalPtr); + assertFalse(walMgr(n).reserved(segment0WalPtr)); + assertFalse(walMgr(n).reserved(segment1WalPtr)); + + assertEquals(1, walMgr(n).truncate(segment1WalPtr)); + assertFalse(walMgr(n).reserve(segment0WalPtr)); + assertFalse(walMgr(n).reserve(segment1WalPtr)); + assertFalse(walMgr(n).reserved(segment0WalPtr)); + assertFalse(walMgr(n).reserved(segment1WalPtr)); + + WALPointer segmentMaxWalPtr = new WALPointer(Long.MAX_VALUE, 0, 0); + assertFalse(walMgr(n).reserve(segmentMaxWalPtr)); + assertFalse(walMgr(n).reserved(segmentMaxWalPtr)); + } + + /** + * Checking the correctness of the method {@link FileWriteAheadLogManager#getWalFilesFromArchive}. + * + * @throws Exception If failed. + */ + @Test + public void testGetWalFilesFromArchive() throws Exception { + IgniteEx n = startGrids(1); + + WALPointer segment0WalPtr = new WALPointer(0, 0, 0); + WALPointer segment1WalPtr = new WALPointer(1, 0, 0); + WALPointer segment2WalPtr = new WALPointer(2, 0, 0); + + CountDownLatch startLatch = new CountDownLatch(1); + IgniteInternalFuture<Collection<File>> fut = runAsync(() -> { + startLatch.countDown(); + + return walMgr(n).getWalFilesFromArchive(segment0WalPtr, segment2WalPtr); + }); + + startLatch.await(); + + // Check that the expected archiving segment 1. + assertThrows(log, () -> fut.get(1_000), IgniteCheckedException.class, null); + + for (int i = 0; walMgr(n).lastArchivedSegment() < 2; i++) + n.cache(DEFAULT_CACHE_NAME).put(i, new byte[(int)(10 * U.KB)]); + + assertEquals(2, fut.get(getTestTimeout()).size()); + + forceCheckpoint(); + + assertEquals(1, walMgr(n).truncate(segment1WalPtr)); + assertEquals(0, walMgr(n).getWalFilesFromArchive(segment0WalPtr, segment2WalPtr).size()); + assertEquals(1, walMgr(n).getWalFilesFromArchive(segment1WalPtr, segment2WalPtr).size()); + } + + /** + * Getting WAL manager. + * + * @param n Node. + * @return WAL manager. + */ + private FileWriteAheadLogManager walMgr(IgniteEx n) { + return (FileWriteAheadLogManager)n.context().cache().context().wal(); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java index d2f044b..d286c126 100755 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java @@ -827,7 +827,7 @@ public abstract class GridAbstractTest extends JUnitAssertAware { * @return First started grid. * @throws Exception If failed. */ - protected final IgniteEx startGrids(int cnt) throws Exception { + protected IgniteEx startGrids(int cnt) throws Exception { assert cnt > 0; IgniteEx ignite = null; diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java index 5510c4b..2e8593f 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java @@ -82,6 +82,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompact import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalDeletionArchiveFsyncTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalDeletionArchiveLogOnlyTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalRolloverTypesTest; +import org.apache.ignite.internal.processors.cache.persistence.db.wal.WriteAheadLogManagerSelfTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.crc.IgniteDataIntegrityTests; import org.apache.ignite.internal.processors.cache.persistence.db.wal.crc.IgniteFsyncReplayWalIteratorInvalidCrcTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.crc.IgnitePureJavaCrcCompatibility; @@ -237,5 +238,7 @@ public class IgnitePdsTestSuite2 { GridTestUtils.addTestIfNeeded(suite, HistoricalRebalanceHeuristicsTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, IgniteLocalWalSizeTest.class, ignoredTests); + + GridTestUtils.addTestIfNeeded(suite, WriteAheadLogManagerSelfTest.class, ignoredTests); } }