IGNITE-10505 Flag IGNITE_DISABLE_WAL_DURING_REBALANCING should be turned on by 
default - Fixes #5578.

Signed-off-by: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ca932821
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ca932821
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ca932821

Branch: refs/heads/ignite-601
Commit: ca932821a3236e3cbd4cbe17403dc040e19ee152
Parents: 28d8acc
Author: Sergey Chugunov <sergey.chugu...@gmail.com>
Authored: Thu Dec 27 11:45:00 2018 +0300
Committer: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com>
Committed: Thu Dec 27 11:45:00 2018 +0300

----------------------------------------------------------------------
 .../processors/cache/WalStateManager.java       |   2 +-
 ...teRebalanceScheduleResendPartitionsTest.java |   2 +-
 ...itePdsCacheWalDisabledOnRebalancingTest.java | 195 +++++++++++++++++--
 3 files changed, 184 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ca932821/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
index 0bcd07da..937f1f0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java
@@ -392,7 +392,7 @@ public class WalStateManager extends 
GridCacheSharedManagerAdapter {
     public void changeLocalStatesOnExchangeDone(AffinityTopologyVersion 
topVer, boolean changedBaseline) {
         if (changedBaseline
             && 
IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_PENDING_TX_TRACKER_ENABLED)
-            || 
!IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_DISABLE_WAL_DURING_REBALANCING,
 false))
+            || 
!IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_DISABLE_WAL_DURING_REBALANCING,
 true))
             return;
 
         Set<Integer> grpsToEnableWal = new HashSet<>();

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca932821/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteRebalanceScheduleResendPartitionsTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteRebalanceScheduleResendPartitionsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteRebalanceScheduleResendPartitionsTest.java
index d52bdd8..f9da942 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteRebalanceScheduleResendPartitionsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteRebalanceScheduleResendPartitionsTest.java
@@ -221,7 +221,7 @@ public class IgniteRebalanceScheduleResendPartitionsTest 
extends GridCommonAbstr
                 if (val1 == null)
                     prevEquals.set(false);
 
-                boolean equals = v0.map().equals(val1.map());
+                boolean equals = v0.map().equals(val1.map()) && 
(v0.topologyVersion().equals(val1.topologyVersion()));
 
                 prevEquals.set(prevEquals.get() && equals);
             });

http://git-wip-us.apache.org/repos/asf/ignite/blob/ca932821/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheWalDisabledOnRebalancingTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheWalDisabledOnRebalancingTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheWalDisabledOnRebalancingTest.java
index 0ecde09..46ac18b 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheWalDisabledOnRebalancingTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheWalDisabledOnRebalancingTest.java
@@ -17,17 +17,24 @@
 package org.apache.ignite.internal.processors.cache.persistence.db;
 
 import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.file.Files;
+import java.nio.file.OpenOption;
 import java.nio.file.Paths;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.BiFunction;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -38,6 +45,10 @@ import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
+import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
+import 
org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.mxbean.CacheGroupMetricsMXBean;
@@ -74,13 +85,23 @@ public class IgnitePdsCacheWalDisabledOnRebalancingTest 
extends GridCommonAbstra
     /** */
     private static final String CACHE3_NAME = "cache3";
 
+    /** Function to generate cache values. */
+    private static final BiFunction<String, Integer, String> GENERATING_FUNC = 
(s, i) -> s + "_value_" + i;
+
+    /** Flag to block rebalancing. */
+    private static final AtomicBoolean blockRebalanceEnabled = new 
AtomicBoolean(false);
+
+    /**  */
+    private static final Semaphore fileIoBlockingSemaphore = new 
Semaphore(Integer.MAX_VALUE);
+
+    /** */
+    private boolean useBlockingFileIO;
+
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
         super.beforeTest();
 
         cleanPersistenceDir();
-
-        
System.setProperty(IgniteSystemProperties.IGNITE_DISABLE_WAL_DURING_REBALANCING,
 "true");
     }
 
     /** {@inheritDoc} */
@@ -88,8 +109,6 @@ public class IgnitePdsCacheWalDisabledOnRebalancingTest 
extends GridCommonAbstra
         stopAllGrids();
 
         cleanPersistenceDir();
-
-        
System.clearProperty(IgniteSystemProperties.IGNITE_DISABLE_WAL_DURING_REBALANCING);
     }
 
     /** {@inheritDoc} */
@@ -126,6 +145,9 @@ public class IgnitePdsCacheWalDisabledOnRebalancingTest 
extends GridCommonAbstra
                         .setPersistenceEnabled(true)
                         .setMaxSize(256 * 1024 * 1024));
 
+            if (useBlockingFileIO)
+                dsCfg.setFileIOFactory(new BlockingCheckpointFileIOFactory());
+
             cfg.setDataStorageConfiguration(dsCfg);
         }
 
@@ -148,8 +170,8 @@ public class IgnitePdsCacheWalDisabledOnRebalancingTest 
extends GridCommonAbstra
 
         ig0.active(true);
 
-        for (int i = 0; i < 3; i++)
-            fillCache(ig0.getOrCreateCache("cache" + i), CACHE_SIZE);
+        for (int i = 1; i < 4; i++)
+          fillCache(ig0.dataStreamer("cache" + i), CACHE_SIZE, 
GENERATING_FUNC);
 
         String ig1Name = "node01-" + grid(1).localNode().consistentId();
 
@@ -202,7 +224,7 @@ public class IgnitePdsCacheWalDisabledOnRebalancingTest 
extends GridCommonAbstra
     public void testServerNodesFromBltLeavesAndJoinsDuringRebalancing() throws 
Exception {
         Ignite ig0 = startGridsMultiThreaded(4);
 
-        fillCache(ig0.cache(CACHE3_NAME), CACHE_SIZE);
+        fillCache(ig0.dataStreamer(CACHE3_NAME), CACHE_SIZE, GENERATING_FUNC);
 
         List<Integer> nonAffinityKeys1 = nearKeys(grid(1).cache(CACHE3_NAME), 
100, CACHE_SIZE / 2);
         List<Integer> nonAffinityKeys2 = nearKeys(grid(2).cache(CACHE3_NAME), 
100, CACHE_SIZE / 2);
@@ -215,7 +237,7 @@ public class IgnitePdsCacheWalDisabledOnRebalancingTest 
extends GridCommonAbstra
         nonAffinityKeysSet.addAll(nonAffinityKeys1);
         nonAffinityKeysSet.addAll(nonAffinityKeys2);
 
-        fillCache(ig0.cache(CACHE3_NAME), nonAffinityKeysSet);
+        fillCache(ig0.dataStreamer(CACHE3_NAME), nonAffinityKeysSet, 
GENERATING_FUNC);
 
         int groupId = ((IgniteEx) ig0).cachex(CACHE3_NAME).context().groupId();
 
@@ -244,6 +266,128 @@ public class IgnitePdsCacheWalDisabledOnRebalancingTest 
extends GridCommonAbstra
             " partitions in MOVING state", allOwned);
     }
 
+    /**
+     * Scenario: when rebalanced MOVING partitions are owning by checkpointer,
+     * concurrent affinity change (caused by BLT change) may lead for 
additional partitions in MOVING state to appear.
+     *
+     * In such situation no partitions should be owned until new rebalancing 
process starts and finishes.
+     *
+     * @throws Exception If failed.
+     */
+    public void testRebalancedPartitionsOwningWithConcurrentAffinityChange() 
throws Exception {
+        Ignite ig0 = startGridsMultiThreaded(4);
+        fillCache(ig0.dataStreamer(CACHE3_NAME), CACHE_SIZE, GENERATING_FUNC);
+
+        // Stop idx=2 to prepare for baseline topology change later.
+        stopGrid(2);
+
+        // Stop idx=1 and cleanup LFS to trigger full rebalancing after it 
restart.
+        String ig1Name = "node01-" + grid(1).localNode().consistentId();
+        stopGrid(1);
+        cleanPersistenceFiles(ig1Name);
+
+        // Blocking fileIO and blockMessagePredicate to block checkpointer and 
rebalancing for node idx=1.
+        useBlockingFileIO = true;
+        int groupId = ((IgniteEx) ig0).cachex(CACHE3_NAME).context().groupId();
+        blockMessagePredicate = (node, msg) -> {
+            if (blockRebalanceEnabled.get() && msg instanceof 
GridDhtPartitionDemandMessage)
+                return ((GridDhtPartitionDemandMessage) msg).groupId() == 
groupId;
+
+            return false;
+        };
+
+        // Enable blocking checkpointer on node idx=1 (see 
BlockingCheckpointFileIOFactory).
+        fileIoBlockingSemaphore.drainPermits();
+
+        IgniteEx ig1 = startGrid(1);
+
+        CacheGroupMetricsMXBean mxBean = 
ig1.cachex(CACHE3_NAME).context().group().mxBean();
+        int locMovingPartsNum = mxBean.getLocalNodeMovingPartitionsCount();
+
+        // Partitions remain in MOVING state even after PME and rebalancing 
when checkpointer is blocked.
+        assertTrue("Expected non-zero value for local moving partitions count 
on node idx = 1: " +
+            locMovingPartsNum, 0 < locMovingPartsNum && locMovingPartsNum < 
CACHE3_PARTS_NUM);
+
+        blockRebalanceEnabled.set(true);
+
+        // Change baseline topology and release checkpointer to verify
+        // that no partitions will be owned after affinity change.
+        
ig0.cluster().setBaselineTopology(ig1.context().discovery().topologyVersion());
+        fileIoBlockingSemaphore.release(Integer.MAX_VALUE);
+
+        locMovingPartsNum = mxBean.getLocalNodeMovingPartitionsCount();
+        assertTrue("Expected moving partitions count on node idx = 1 equals to 
all partitions of the cache " +
+             CACHE3_NAME + ": " + locMovingPartsNum, locMovingPartsNum == 
CACHE3_PARTS_NUM);
+
+        TestRecordingCommunicationSpi commSpi = 
(TestRecordingCommunicationSpi) ig1
+            .configuration().getCommunicationSpi();
+
+        // When we stop blocking demand message rebalancing should complete 
and all partitions should be owned.
+        commSpi.stopBlock();
+
+        boolean res = GridTestUtils.waitForCondition(
+            () -> mxBean.getLocalNodeMovingPartitionsCount() == 0, 15_000);
+
+        assertTrue("All partitions on node idx = 1 are expected to be owned", 
res);
+
+        verifyCache(ig1.cache(CACHE3_NAME), GENERATING_FUNC);
+    }
+
+    /** FileIOFactory implementation that enables blocking of writes to disk 
so checkpoint can be blocked. */
+    private static class BlockingCheckpointFileIOFactory implements 
FileIOFactory {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+
+        /** Delegate factory. */
+        private final FileIOFactory delegateFactory = new 
RandomAccessFileIOFactory();
+
+        /** {@inheritDoc} */
+        @Override public FileIO create(File file, OpenOption... modes) throws 
IOException {
+            FileIO delegate = delegateFactory.create(file, modes);
+
+            return new FileIODecorator(delegate) {
+                @Override public int write(ByteBuffer srcBuf) throws 
IOException {
+                    if 
(Thread.currentThread().getName().contains("checkpoint")) {
+                        try {
+                            fileIoBlockingSemaphore.acquire();
+                        }
+                        catch (InterruptedException ignored) {
+                            // No-op.
+                        }
+                    }
+
+                    return delegate.write(srcBuf);
+                }
+
+                @Override public int write(ByteBuffer srcBuf, long position) 
throws IOException {
+                    if 
(Thread.currentThread().getName().contains("checkpoint")) {
+                        try {
+                            fileIoBlockingSemaphore.acquire();
+                        }
+                        catch (InterruptedException ignored) {
+                            // No-op.
+                        }
+                    }
+
+                    return delegate.write(srcBuf, position);
+                }
+
+                @Override public int write(byte[] buf, int off, int len) 
throws IOException {
+                    if 
(Thread.currentThread().getName().contains("checkpoint")) {
+                        try {
+                            fileIoBlockingSemaphore.acquire();
+                        }
+                        catch (InterruptedException ignored) {
+                            // No-op.
+                        }
+                    }
+
+                    return delegate.write(buf, off, len);
+                }
+            };
+        }
+    }
+
     /** */
     private void cleanPersistenceFiles(String igName) throws Exception {
         String ig1DbPath = Paths.get(DFLT_STORE_DIR, igName).toString();
@@ -259,14 +403,39 @@ public class IgnitePdsCacheWalDisabledOnRebalancingTest 
extends GridCommonAbstra
     }
 
     /** */
-    private void fillCache(IgniteCache cache, int cacheSize) {
+    private void fillCache(
+        IgniteDataStreamer streamer,
+        int cacheSize,
+        BiFunction<String, Integer, String> generatingFunc
+    ) {
+        String name = streamer.cacheName();
+
         for (int i = 0; i < cacheSize; i++)
-            cache.put(i, "value_" + i);
+            streamer.addData(i, generatingFunc.apply(name, i));
     }
 
     /** */
-    private void fillCache(IgniteCache cache, Collection<Integer> keys) {
+    private void fillCache(
+        IgniteDataStreamer streamer,
+        Collection<Integer> keys,
+        BiFunction<String, Integer, String> generatingFunc
+    ) {
+        String cacheName = streamer.cacheName();
+
         for (Integer key : keys)
-            cache.put(key, "value_" + key);
+            streamer.addData(key, generatingFunc.apply(cacheName, key));
+    }
+
+    /** */
+    private void verifyCache(IgniteCache cache, BiFunction<String, Integer, 
String> generatingFunc) {
+        int size = cache.size(CachePeekMode.PRIMARY);
+
+        String cacheName = cache.getName();
+
+        for (int i = 0; i < size; i++) {
+            String value = (String) cache.get(i);
+
+            assertEquals(generatingFunc.apply(cacheName, i), value);
+        }
     }
 }

Reply via email to