This is an automated email from the ASF dual-hosted git repository. sergeychugunov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 49f4086 IGNITE-13569 disable archiving + walCompactionEnabled probably broke reading from wal on server restart - Fixes #8344. 49f4086 is described below commit 49f4086a2b97857bd45bea107510210fcab72cdb Author: Anton Kalashnikov <kaa....@yandex.ru> AuthorDate: Fri Oct 16 16:56:59 2020 +0300 IGNITE-13569 disable archiving + walCompactionEnabled probably broke reading from wal on server restart - Fixes #8344. Signed-off-by: Sergey Chugunov <sergey.chugu...@gmail.com> --- .../persistence/wal/FileWriteAheadLogManager.java | 36 ++++-- .../db/wal/WalCompactionNoArchiverTest.java | 135 +++++++++++++++++++++ .../ignite/testsuites/IgnitePdsMvccTestSuite2.java | 2 + .../ignite/testsuites/IgnitePdsTestSuite2.java | 2 + 4 files changed, 164 insertions(+), 11 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index 6e73141..b79d637 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -484,17 +484,19 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl segmentAware = new SegmentAware(dsCfg.getWalSegments(), dsCfg.isWalCompactionEnabled()); - if (isArchiverEnabled()) - archiver = new FileArchiver(segmentAware, log); - else - archiver = null; - + // We have to initialize compressor before archiver in order to setup already compressed segments. + // Otherwise, FileArchiver initialization will trigger redundant work for FileCompressor. if (dsCfg.isWalCompactionEnabled()) { compressor = new FileCompressor(log); decompressor = new FileDecompressor(log); } + if (isArchiverEnabled()) + archiver = new FileArchiver(segmentAware, log); + else + archiver = null; + segmentRouter = new SegmentRouter(walWorkDir, walArchiveDir, segmentAware, dsCfg); fileHandleManager = fileHandleManagerFactory.build( @@ -2072,6 +2074,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** */ FileCompressor(IgniteLogger log) { super(0, log); + + initAlreadyCompressedSegments(); } /** */ @@ -2085,11 +2089,6 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl f.delete(); } - FileDescriptor[] alreadyCompressed = scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_COMPACTED_FILTER)); - - if (alreadyCompressed.length > 0) - segmentAware.onSegmentCompressed(alreadyCompressed[alreadyCompressed.length - 1].idx()); - for (int i = 1; i < calculateThreadCount(); i++) { FileCompressorWorker worker = new FileCompressorWorker(i, log); @@ -2102,6 +2101,16 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } /** + * Checks if there are already compressed segments and assigns counters if needed. + */ + private void initAlreadyCompressedSegments() { + FileDescriptor[] alreadyCompressed = scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_COMPACTED_FILTER)); + + if (alreadyCompressed.length > 0) + segmentAware.onSegmentCompressed(alreadyCompressed[alreadyCompressed.length - 1].idx()); + } + + /** * Calculate optimal additional compressor worker threads count. If quarter of proc threads greater * than WAL_COMPRESSOR_WORKER_THREAD_CNT, use this value. Otherwise, reduce number of threads. * @@ -2148,6 +2157,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl /** */ private class FileCompressorWorker extends GridWorker { + /** Last compression error. */ + private volatile Throwable lastCompressionError; + /** */ FileCompressorWorker(int idx, IgniteLogger log) { super(cctx.igniteInstanceName(), "wal-file-compressor-%" + cctx.igniteInstanceName() + "%-" + idx, log); @@ -2228,8 +2240,10 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl Thread.currentThread().interrupt(); } catch (IgniteCheckedException | IOException e) { + lastCompressionError = e; + U.error(log, "Compression of WAL segment [idx=" + segIdx + - "] was skipped due to unexpected error", e); + "] was skipped due to unexpected error", lastCompressionError); segmentAware.onSegmentCompressed(segIdx); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionNoArchiverTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionNoArchiverTest.java new file mode 100644 index 0000000..4c50e25 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionNoArchiverTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.internal.processors.cache.persistence.db.wal; + +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.junits.WithSystemProperty; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.junit.Test; + +/** + * Checks that WAL compaction works correctly in no-archiver mode. + */ +public class WalCompactionNoArchiverTest extends GridCommonAbstractTest { + /** Wal segment size. */ + private static final int WAL_SEGMENT_SIZE = 4 * 1024 * 1024; + + /** Cache name. */ + public static final String CACHE_NAME = "cache"; + + /** Entries count. */ + public static final int ENTRIES = 1000; + + /** WAL path. */ + public static final String WAL_PATH = "no-arch"; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String name) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(name); + + cfg.setDataStorageConfiguration(new DataStorageConfiguration() + .setDefaultDataRegionConfiguration(new DataRegionConfiguration() + .setPersistenceEnabled(true) + .setMaxSize(200L * 1024 * 1024)) + .setWalSegmentSize(WAL_SEGMENT_SIZE) + .setWalCompactionEnabled(true) + .setWalArchivePath(WAL_PATH) + .setWalPath(WAL_PATH) + .setCheckpointFrequency(1000)); + + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setName(CACHE_NAME); + ccfg.setAffinity(new RendezvousAffinityFunction(false, 16)); + + cfg.setCacheConfiguration(ccfg); + cfg.setConsistentId(name); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + + U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), WAL_PATH, true)); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + + U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), WAL_PATH, true)); + } + + /** + * Tests that attempts to compress WAL segment don't result with error. + */ + @Test + @WithSystemProperty(key = "IGNITE_WAL_COMPRESSOR_WORKER_THREAD_CNT", value = "1") + public void testNoCompressionErrors() throws Exception { + IgniteEx ig = startGrid(0); + ig.cluster().active(true); + + IgniteCache<Integer, byte[]> cache = ig.cache(CACHE_NAME); + + for (int i = 0; i < ENTRIES; i++) { // At least 20MB of raw data in total. + final byte[] val = new byte[40000]; + + val[i] = 1; + + cache.put(i, val); + } + + stopAllGrids(); + + ig = startGrid(0); + + cache = ig.cache(CACHE_NAME); + + for (int i = 0; i < ENTRIES; i++) { // At least 20MB of raw data in total. + final byte[] val = new byte[40000]; + + val[i] = 1; + + cache.put(i, val); + } + + IgniteWriteAheadLogManager wal = ig.context().cache().context().wal(); + + Object compressor = U.field(wal, "compressor"); + + assertNotNull(compressor); + + Object error = U.field(compressor, "lastCompressionError"); + + if (error != null) + fail("Unexpected error in FileCompressor: " + error); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite2.java index a17e889..3c92319 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite2.java @@ -38,6 +38,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalI import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalIteratorSwitchSegmentTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRebalanceLoggingTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalSerializerVersionTest; +import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompactionNoArchiverTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompactionSwitchOnTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompactionTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalRolloverTypesTest; @@ -88,6 +89,7 @@ public class IgnitePdsMvccTestSuite2 { ignoredTests.add(IgniteUidAsConsistentIdMigrationTest.class); ignoredTests.add(IgniteWalSerializerVersionTest.class); ignoredTests.add(WalCompactionTest.class); + ignoredTests.add(WalCompactionNoArchiverTest.class); ignoredTests.add(WalCompactionSwitchOnTest.class); ignoredTests.add(IgniteWalIteratorSwitchSegmentTest.class); ignoredTests.add(IgniteWalIteratorExceptionDuringReadTest.class); diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java index 00fc33a..8ff45e4 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java @@ -74,6 +74,7 @@ import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalR import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRecoverySeveralRestartsTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalReplayingAfterRestartTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalSerializerVersionTest; +import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompactionNoArchiverTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompactionSwitchOnTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompactionTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalDeletionArchiveFsyncTest; @@ -201,6 +202,7 @@ public class IgnitePdsTestSuite2 { GridTestUtils.addTestIfNeeded(suite, IgniteUidAsConsistentIdMigrationTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, IgniteWalSerializerVersionTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, WalCompactionTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, WalCompactionNoArchiverTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, WalCompactionSwitchOnTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, WalDeletionArchiveFsyncTest.class, ignoredTests); GridTestUtils.addTestIfNeeded(suite, WalDeletionArchiveLogOnlyTest.class, ignoredTests);