IGNITE-8946 AssertionError can occur during release of WAL history that was 
reserved for historical rebalance


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/54055ec0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/54055ec0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/54055ec0

Branch: refs/heads/ignite-8446
Commit: 54055ec0c09caadf939e00a3a86098f610b52d8c
Parents: ee909a3
Author: Ivan Rakov <ira...@apache.org>
Authored: Wed Jul 11 18:45:31 2018 +0300
Committer: Ivan Rakov <ira...@apache.org>
Committed: Wed Jul 11 18:45:31 2018 +0300

----------------------------------------------------------------------
 .../checkpoint/CheckpointHistory.java           |  35 +--
 ...SlowHistoricalRebalanceSmallHistoryTest.java | 236 +++++++++++++++++++
 .../ignite/testsuites/IgnitePdsTestSuite2.java  |   3 +
 3 files changed, 257 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/54055ec0/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java
index cef2093..95f150b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointHistory.java
@@ -326,26 +326,13 @@ public class CheckpointHistory {
                 if (!reserved)
                     break;
 
-                for (Integer grpId : groupsAndPartitions.keySet())
+                for (Integer grpId : new 
HashSet<>(groupsAndPartitions.keySet()))
                     if (!isCheckpointApplicableForGroup(grpId, chpEntry))
                         groupsAndPartitions.remove(grpId);
 
-                // All groups are no more applicable, release history and stop 
searching.
-                if (groupsAndPartitions.isEmpty()) {
-                    cctx.wal().release(chpEntry.checkpointMark());
-
-                    break;
-                }
-
-                // Release previous checkpoint marker.
-                if (prevReserved != null)
-                    cctx.wal().release(prevReserved.checkpointMark());
-
-                prevReserved = chpEntry;
-
                 for (Map.Entry<Integer, CheckpointEntry.GroupState> state : 
chpEntry.groupState(cctx).entrySet()) {
                     int grpId = state.getKey();
-                    CheckpointEntry.GroupState cpGroupState = state.getValue();
+                    CheckpointEntry.GroupState cpGrpState = state.getValue();
 
                     Set<Integer> applicablePartitions = 
groupsAndPartitions.get(grpId);
 
@@ -355,7 +342,7 @@ public class CheckpointHistory {
                     Set<Integer> inapplicablePartitions = null;
 
                     for (Integer partId : applicablePartitions) {
-                        int pIdx = cpGroupState.indexByPartition(partId);
+                        int pIdx = cpGrpState.indexByPartition(partId);
 
                         if (pIdx >= 0)
                             res.computeIfAbsent(grpId, k -> new 
HashMap<>()).put(partId, chpEntry);
@@ -374,9 +361,23 @@ public class CheckpointHistory {
                 }
 
                 // Remove groups from search with empty set of applicable 
partitions.
-                for (Map.Entry<Integer, Set<Integer>> e : 
groupsAndPartitions.entrySet())
+                for (Map.Entry<Integer, Set<Integer>> e : new 
HashSet<>(groupsAndPartitions.entrySet()))
                     if (e.getValue().isEmpty())
                         groupsAndPartitions.remove(e.getKey());
+
+                // All groups are no more applicable, release history and stop 
searching.
+                if (groupsAndPartitions.isEmpty()) {
+                    cctx.wal().release(chpEntry.checkpointMark());
+
+                    break;
+                }
+                else {
+                    // Release previous checkpoint marker.
+                    if (prevReserved != null)
+                        cctx.wal().release(prevReserved.checkpointMark());
+
+                    prevReserved = chpEntry;
+                }
             }
             catch (IgniteCheckedException ex) {
                 U.error(log, "Failed to process checkpoint: " + (chpEntry != 
null ? chpEntry : "none"), ex);

http://git-wip-us.apache.org/repos/asf/ignite/blob/54055ec0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/SlowHistoricalRebalanceSmallHistoryTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/SlowHistoricalRebalanceSmallHistoryTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/SlowHistoricalRebalanceSmallHistoryTest.java
new file mode 100644
index 0000000..8f2e738
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/SlowHistoricalRebalanceSmallHistoryTest.java
@@ -0,0 +1,236 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*      http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.ignite.internal.processors.cache.persistence.db;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.GridCacheGroupIdMessage;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ *
+ */
+public class SlowHistoricalRebalanceSmallHistoryTest extends 
GridCommonAbstractTest {
+    /** Ip finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** Slow rebalance cache name. */
+    private static final String SLOW_REBALANCE_CACHE = "b13813ce";
+
+    /** Regular cache name. */
+    private static final String REGULAR_CACHE = "another-cache";
+
+    /** Supply message latch. */
+    private static final AtomicReference<CountDownLatch> SUPPLY_MESSAGE_LATCH 
= new AtomicReference<>();
+
+    /** Wal history size. */
+    private static final int WAL_HISTORY_SIZE = 5;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String name) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(name);
+
+        cfg.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
+
+        cfg.setDataStorageConfiguration(
+            new DataStorageConfiguration()
+                .setWalHistorySize(WAL_HISTORY_SIZE)
+                .setDefaultDataRegionConfiguration(
+                    new DataRegionConfiguration()
+                        .setPersistenceEnabled(true)
+                )
+                .setWalSegmentSize(512 * 1024)
+        );
+
+        cfg.setFailureHandler(new StopNodeFailureHandler());
+
+        cfg.setCommunicationSpi(new RebalanceBlockingSPI());
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        SUPPLY_MESSAGE_LATCH.set(new CountDownLatch(1));
+
+        
System.setProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD, 
"1000");
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        SUPPLY_MESSAGE_LATCH.get().countDown();
+
+        SUPPLY_MESSAGE_LATCH.set(null);
+
+        
System.clearProperty(IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD);
+
+        cleanPersistenceDir();
+    }
+
+    /**
+     * Checks that we reserve and release the same WAL index on exchange.
+     */
+    public void testReservation() throws Exception {
+        IgniteEx ig = startGrid(0);
+
+        ig.cluster().active(true);
+
+        ig.getOrCreateCache(new CacheConfiguration<>()
+            .setName(SLOW_REBALANCE_CACHE)
+            .setAffinity(new RendezvousAffinityFunction(false, 1))
+            .setBackups(1)
+            
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+            .setRebalanceBatchSize(100));
+
+        try (IgniteDataStreamer<Object, Object> streamer = 
ig.dataStreamer(SLOW_REBALANCE_CACHE)) {
+            for (int i = 0; i < 3_000; i++)
+                streamer.addData(i, new byte[5 * 1000]);
+
+            streamer.flush();
+        }
+
+        startGrid(1);
+
+        resetBaselineTopology();
+
+        IgniteCache<Object, Object> anotherCache = ig.getOrCreateCache(new 
CacheConfiguration<>()
+            .setName(REGULAR_CACHE)
+            .setAffinity(new RendezvousAffinityFunction(false, 1))
+            .setBackups(1)
+            
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+            .setRebalanceBatchSize(100));
+
+        Thread.sleep(7_000); // To let distributedExchange() finish.
+
+        for (int i = 0; i < WAL_HISTORY_SIZE; i++) {
+            for (int j = 0; i < 500; i++)
+                anotherCache.put(j, new byte[5 * 1000]);
+
+            forceCheckpoint(); // Checkpoints where partition is OWNING on 
grid(0), MOVING on grid(1)
+
+            for (int j = 0; i < 500; i++)
+                anotherCache.put(j, new byte[5 * 1000]);
+        }
+
+        SUPPLY_MESSAGE_LATCH.get().countDown();
+
+        waitForRebalancing(); // Partition is OWNING on grid(0) and grid(1)
+
+        for (int i = 0; i < 2; i++) {
+            for (int j = 0; i < 500; i++)
+                anotherCache.put(j, new byte[5 * 1000]);
+
+            forceCheckpoint(); // A few more checkpoints when partition is 
OWNING everywhere
+
+            for (int j = 0; i < 500; i++)
+                anotherCache.put(j, new byte[5 * 1000]);
+        }
+
+        stopGrid(0);
+
+        IgniteCache<Object, Object> anotherCacheGrid1 = 
grid(1).cache(REGULAR_CACHE);
+
+        for (int i = 0; i < 500; i++)
+            anotherCacheGrid1.put(i, new byte[5 * 1000]);
+
+        startGrid(0);
+
+        waitForRebalancing();
+
+        assertEquals(2, 
grid(1).context().discovery().aliveServerNodes().size());
+    }
+
+    /**
+     *
+     */
+    private static class RebalanceBlockingSPI extends TcpCommunicationSpi {
+        /** */
+        public static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, Message msg) 
throws IgniteSpiException {
+            if (msg instanceof GridIoMessage && ((GridIoMessage)msg).message() 
instanceof GridDhtPartitionSupplyMessage) {
+                int grpId = 
((GridCacheGroupIdMessage)((GridIoMessage)msg).message()).groupId();
+
+                if (grpId == CU.cacheId(SLOW_REBALANCE_CACHE)) {
+                    CountDownLatch latch0 = SUPPLY_MESSAGE_LATCH.get();
+
+                    if (latch0 != null)
+                        try {
+                            latch0.await();
+                        }
+                        catch (InterruptedException ex) {
+                            throw new IgniteException(ex);
+                        }
+                }
+            }
+
+            super.sendMessage(node, msg);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, Message msg,
+            IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
+            if (msg instanceof GridIoMessage && ((GridIoMessage)msg).message() 
instanceof GridDhtPartitionSupplyMessage) {
+                int grpId = 
((GridCacheGroupIdMessage)((GridIoMessage)msg).message()).groupId();
+
+                if (grpId == CU.cacheId(SLOW_REBALANCE_CACHE)) {
+                    CountDownLatch latch0 = SUPPLY_MESSAGE_LATCH.get();
+
+                    if (latch0 != null)
+                        try {
+                            latch0.await();
+                        }
+                        catch (InterruptedException ex) {
+                            throw new IgniteException(ex);
+                        }
+                }
+            }
+
+            super.sendMessage(node, msg, ackC);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/54055ec0/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
index 1b25b0f..529fb60 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java
@@ -39,6 +39,7 @@ import 
org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsPageE
 import 
org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsRebalancingOnNotStableTopologyTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsTransactionsHangTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsWholeClusterRestartTest;
+import 
org.apache.ignite.internal.processors.cache.persistence.db.SlowHistoricalRebalanceSmallHistoryTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.checkpoint.IgniteCheckpointDirtyPagesForLowLoadTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.IgnitePdsUnusedWalSegmentsTest;
 import 
org.apache.ignite.internal.processors.cache.persistence.db.filename.IgniteUidAsConsistentIdMigrationTest;
@@ -142,6 +143,8 @@ public class IgnitePdsTestSuite2 extends TestSuite {
         // Rebalancing test
         suite.addTestSuite(IgniteWalHistoryReservationsTest.class);
 
+        suite.addTestSuite(SlowHistoricalRebalanceSmallHistoryTest.class);
+
         suite.addTestSuite(IgnitePersistentStoreDataStructuresTest.class);
 
         // Failover test

Reply via email to