This is an automated email from the ASF dual-hosted git repository.

sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 49f4086  IGNITE-13569 disable archiving + walCompactionEnabled 
probably broke reading from wal on server restart - Fixes #8344.
49f4086 is described below

commit 49f4086a2b97857bd45bea107510210fcab72cdb
Author: Anton Kalashnikov <kaa....@yandex.ru>
AuthorDate: Fri Oct 16 16:56:59 2020 +0300

    IGNITE-13569 disable archiving + walCompactionEnabled probably broke 
reading from wal on server restart - Fixes #8344.
    
    Signed-off-by: Sergey Chugunov <sergey.chugu...@gmail.com>
---
 .../persistence/wal/FileWriteAheadLogManager.java  |  36 ++++--
 .../db/wal/WalCompactionNoArchiverTest.java        | 135 +++++++++++++++++++++
 .../ignite/testsuites/IgnitePdsMvccTestSuite2.java |   2 +
 .../ignite/testsuites/IgnitePdsTestSuite2.java     |   2 +
 4 files changed, 164 insertions(+), 11 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 6e73141..b79d637 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -484,17 +484,19 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
             segmentAware = new SegmentAware(dsCfg.getWalSegments(), 
dsCfg.isWalCompactionEnabled());
 
-            if (isArchiverEnabled())
-                archiver = new FileArchiver(segmentAware, log);
-            else
-                archiver = null;
-
+            // We have to initialize compressor before archiver in order to 
setup already compressed segments.
+            // Otherwise, FileArchiver initialization will trigger redundant 
work for FileCompressor.
             if (dsCfg.isWalCompactionEnabled()) {
                 compressor = new FileCompressor(log);
 
                 decompressor = new FileDecompressor(log);
             }
 
+            if (isArchiverEnabled())
+                archiver = new FileArchiver(segmentAware, log);
+            else
+                archiver = null;
+
             segmentRouter = new SegmentRouter(walWorkDir, walArchiveDir, 
segmentAware, dsCfg);
 
             fileHandleManager = fileHandleManagerFactory.build(
@@ -2072,6 +2074,8 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         /** */
         FileCompressor(IgniteLogger log) {
             super(0, log);
+
+            initAlreadyCompressedSegments();
         }
 
         /** */
@@ -2085,11 +2089,6 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                 f.delete();
             }
 
-            FileDescriptor[] alreadyCompressed = 
scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_COMPACTED_FILTER));
-
-            if (alreadyCompressed.length > 0)
-                
segmentAware.onSegmentCompressed(alreadyCompressed[alreadyCompressed.length - 
1].idx());
-
             for (int i = 1; i < calculateThreadCount(); i++) {
                 FileCompressorWorker worker = new FileCompressorWorker(i, log);
 
@@ -2102,6 +2101,16 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
         }
 
         /**
+         * Checks if there are already compressed segments and assigns 
counters if needed.
+         */
+        private void initAlreadyCompressedSegments() {
+            FileDescriptor[] alreadyCompressed = 
scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_COMPACTED_FILTER));
+
+            if (alreadyCompressed.length > 0)
+                
segmentAware.onSegmentCompressed(alreadyCompressed[alreadyCompressed.length - 
1].idx());
+        }
+
+        /**
          * Calculate optimal additional compressor worker threads count. If 
quarter of proc threads greater
          * than WAL_COMPRESSOR_WORKER_THREAD_CNT, use this value. Otherwise, 
reduce number of threads.
          *
@@ -2148,6 +2157,9 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
 
     /** */
     private class FileCompressorWorker extends GridWorker {
+        /** Last compression error. */
+        private volatile Throwable lastCompressionError;
+
         /** */
         FileCompressorWorker(int idx, IgniteLogger log) {
             super(cctx.igniteInstanceName(), "wal-file-compressor-%" + 
cctx.igniteInstanceName() + "%-" + idx, log);
@@ -2228,8 +2240,10 @@ public class FileWriteAheadLogManager extends 
GridCacheSharedManagerAdapter impl
                     Thread.currentThread().interrupt();
                 }
                 catch (IgniteCheckedException | IOException e) {
+                    lastCompressionError = e;
+
                     U.error(log, "Compression of WAL segment [idx=" + segIdx +
-                            "] was skipped due to unexpected error", e);
+                            "] was skipped due to unexpected error", 
lastCompressionError);
 
                     segmentAware.onSegmentCompressed(segIdx);
                 }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionNoArchiverTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionNoArchiverTest.java
new file mode 100644
index 0000000..4c50e25
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WalCompactionNoArchiverTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.persistence.db.wal;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.testframework.junits.WithSystemProperty;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+/**
+ * Checks that WAL compaction works correctly in no-archiver mode.
+ */
+public class WalCompactionNoArchiverTest extends GridCommonAbstractTest {
+    /** Wal segment size. */
+    private static final int WAL_SEGMENT_SIZE = 4 * 1024 * 1024;
+
+    /** Cache name. */
+    public static final String CACHE_NAME = "cache";
+
+    /** Entries count. */
+    public static final int ENTRIES = 1000;
+
+    /** WAL path. */
+    public static final String WAL_PATH = "no-arch";
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String name) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(name);
+
+        cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+            .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                .setPersistenceEnabled(true)
+                .setMaxSize(200L * 1024 * 1024))
+            .setWalSegmentSize(WAL_SEGMENT_SIZE)
+            .setWalCompactionEnabled(true)
+            .setWalArchivePath(WAL_PATH)
+            .setWalPath(WAL_PATH)
+            .setCheckpointFrequency(1000));
+
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setName(CACHE_NAME);
+        ccfg.setAffinity(new RendezvousAffinityFunction(false, 16));
+
+        cfg.setCacheConfiguration(ccfg);
+        cfg.setConsistentId(name);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+
+        U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), WAL_PATH, 
true));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+
+        U.delete(U.resolveWorkDirectory(U.defaultWorkDirectory(), WAL_PATH, 
true));
+    }
+
+    /**
+     * Tests that attempts to compress WAL segment don't result with error.
+     */
+    @Test
+    @WithSystemProperty(key = "IGNITE_WAL_COMPRESSOR_WORKER_THREAD_CNT", value 
= "1")
+    public void testNoCompressionErrors() throws Exception {
+        IgniteEx ig = startGrid(0);
+        ig.cluster().active(true);
+
+        IgniteCache<Integer, byte[]> cache = ig.cache(CACHE_NAME);
+
+        for (int i = 0; i < ENTRIES; i++) { // At least 20MB of raw data in 
total.
+            final byte[] val = new byte[40000];
+
+            val[i] = 1;
+
+            cache.put(i, val);
+        }
+
+        stopAllGrids();
+
+        ig = startGrid(0);
+
+        cache = ig.cache(CACHE_NAME);
+
+        for (int i = 0; i < ENTRIES; i++) { // At least 20MB of raw data in 
total.
+            final byte[] val = new byte[40000];
+
+            val[i] = 1;
+
+            cache.put(i, val);
+        }
+
+        IgniteWriteAheadLogManager wal = ig.context().cache().context().wal();
+
+        Object compressor = U.field(wal, "compressor");
+
+        assertNotNull(compressor);
+
+        Object error = U.field(compressor, "lastCompressionError");
+
+        if (error != null)
+            fail("Unexpected error in FileCompressor: " + error);
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite2.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite2.java
index a17e889..3c92319 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite2.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsMvccTestSuite2.java
@@ -38,6 +38,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalI
 import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalIteratorSwitchSegmentTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRebalanceLoggingTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalSerializerVersionTest;
+import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompactionNoArchiverTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompactionSwitchOnTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompactionTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.WalRolloverTypesTest;
@@ -88,6 +89,7 @@ public class IgnitePdsMvccTestSuite2 {
         ignoredTests.add(IgniteUidAsConsistentIdMigrationTest.class);
         ignoredTests.add(IgniteWalSerializerVersionTest.class);
         ignoredTests.add(WalCompactionTest.class);
+        ignoredTests.add(WalCompactionNoArchiverTest.class);
         ignoredTests.add(WalCompactionSwitchOnTest.class);
         ignoredTests.add(IgniteWalIteratorSwitchSegmentTest.class);
         ignoredTests.add(IgniteWalIteratorExceptionDuringReadTest.class);
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index 00fc33a..8ff45e4 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -74,6 +74,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalR
 import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalRecoverySeveralRestartsTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalReplayingAfterRestartTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.IgniteWalSerializerVersionTest;
+import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompactionNoArchiverTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompactionSwitchOnTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompactionTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.wal.WalDeletionArchiveFsyncTest;
@@ -201,6 +202,7 @@ public class IgnitePdsTestSuite2 {
         GridTestUtils.addTestIfNeeded(suite, 
IgniteUidAsConsistentIdMigrationTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
IgniteWalSerializerVersionTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, WalCompactionTest.class, 
ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, 
WalCompactionNoArchiverTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, WalCompactionSwitchOnTest.class, 
ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
WalDeletionArchiveFsyncTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
WalDeletionArchiveLogOnlyTest.class, ignoredTests);

Reply via email to