IGNITE-10505 Flag IGNITE_DISABLE_WAL_DURING_REBALANCING should be turned on by default - Fixes #5578.
Signed-off-by: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ca932821 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ca932821 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ca932821 Branch: refs/heads/ignite-601 Commit: ca932821a3236e3cbd4cbe17403dc040e19ee152 Parents: 28d8acc Author: Sergey Chugunov <sergey.chugu...@gmail.com> Authored: Thu Dec 27 11:45:00 2018 +0300 Committer: Dmitriy Govorukhin <dmitriy.govoruk...@gmail.com> Committed: Thu Dec 27 11:45:00 2018 +0300 ---------------------------------------------------------------------- .../processors/cache/WalStateManager.java | 2 +- ...teRebalanceScheduleResendPartitionsTest.java | 2 +- ...itePdsCacheWalDisabledOnRebalancingTest.java | 195 +++++++++++++++++-- 3 files changed, 184 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/ca932821/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java index 0bcd07da..937f1f0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/WalStateManager.java @@ -392,7 +392,7 @@ public class WalStateManager extends GridCacheSharedManagerAdapter { public void changeLocalStatesOnExchangeDone(AffinityTopologyVersion topVer, boolean changedBaseline) { if (changedBaseline && IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_PENDING_TX_TRACKER_ENABLED) - || !IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_DISABLE_WAL_DURING_REBALANCING, false)) + || !IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_DISABLE_WAL_DURING_REBALANCING, true)) return; Set<Integer> grpsToEnableWal = new HashSet<>(); http://git-wip-us.apache.org/repos/asf/ignite/blob/ca932821/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteRebalanceScheduleResendPartitionsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteRebalanceScheduleResendPartitionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteRebalanceScheduleResendPartitionsTest.java index d52bdd8..f9da942 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteRebalanceScheduleResendPartitionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteRebalanceScheduleResendPartitionsTest.java @@ -221,7 +221,7 @@ public class IgniteRebalanceScheduleResendPartitionsTest extends GridCommonAbstr if (val1 == null) prevEquals.set(false); - boolean equals = v0.map().equals(val1.map()); + boolean equals = v0.map().equals(val1.map()) && (v0.topologyVersion().equals(val1.topologyVersion())); prevEquals.set(prevEquals.get() && equals); }); http://git-wip-us.apache.org/repos/asf/ignite/blob/ca932821/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheWalDisabledOnRebalancingTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheWalDisabledOnRebalancingTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheWalDisabledOnRebalancingTest.java index 0ecde09..46ac18b 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheWalDisabledOnRebalancingTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/IgnitePdsCacheWalDisabledOnRebalancingTest.java @@ -17,17 +17,24 @@ package org.apache.ignite.internal.processors.cache.persistence.db; import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.file.Files; +import java.nio.file.OpenOption; import java.nio.file.Paths; import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiFunction; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; -import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; @@ -38,6 +45,10 @@ import org.apache.ignite.configuration.WALMode; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.TestRecordingCommunicationSpi; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIODecorator; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.mxbean.CacheGroupMetricsMXBean; @@ -74,13 +85,23 @@ public class IgnitePdsCacheWalDisabledOnRebalancingTest extends GridCommonAbstra /** */ private static final String CACHE3_NAME = "cache3"; + /** Function to generate cache values. */ + private static final BiFunction<String, Integer, String> GENERATING_FUNC = (s, i) -> s + "_value_" + i; + + /** Flag to block rebalancing. */ + private static final AtomicBoolean blockRebalanceEnabled = new AtomicBoolean(false); + + /** */ + private static final Semaphore fileIoBlockingSemaphore = new Semaphore(Integer.MAX_VALUE); + + /** */ + private boolean useBlockingFileIO; + /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { super.beforeTest(); cleanPersistenceDir(); - - System.setProperty(IgniteSystemProperties.IGNITE_DISABLE_WAL_DURING_REBALANCING, "true"); } /** {@inheritDoc} */ @@ -88,8 +109,6 @@ public class IgnitePdsCacheWalDisabledOnRebalancingTest extends GridCommonAbstra stopAllGrids(); cleanPersistenceDir(); - - System.clearProperty(IgniteSystemProperties.IGNITE_DISABLE_WAL_DURING_REBALANCING); } /** {@inheritDoc} */ @@ -126,6 +145,9 @@ public class IgnitePdsCacheWalDisabledOnRebalancingTest extends GridCommonAbstra .setPersistenceEnabled(true) .setMaxSize(256 * 1024 * 1024)); + if (useBlockingFileIO) + dsCfg.setFileIOFactory(new BlockingCheckpointFileIOFactory()); + cfg.setDataStorageConfiguration(dsCfg); } @@ -148,8 +170,8 @@ public class IgnitePdsCacheWalDisabledOnRebalancingTest extends GridCommonAbstra ig0.active(true); - for (int i = 0; i < 3; i++) - fillCache(ig0.getOrCreateCache("cache" + i), CACHE_SIZE); + for (int i = 1; i < 4; i++) + fillCache(ig0.dataStreamer("cache" + i), CACHE_SIZE, GENERATING_FUNC); String ig1Name = "node01-" + grid(1).localNode().consistentId(); @@ -202,7 +224,7 @@ public class IgnitePdsCacheWalDisabledOnRebalancingTest extends GridCommonAbstra public void testServerNodesFromBltLeavesAndJoinsDuringRebalancing() throws Exception { Ignite ig0 = startGridsMultiThreaded(4); - fillCache(ig0.cache(CACHE3_NAME), CACHE_SIZE); + fillCache(ig0.dataStreamer(CACHE3_NAME), CACHE_SIZE, GENERATING_FUNC); List<Integer> nonAffinityKeys1 = nearKeys(grid(1).cache(CACHE3_NAME), 100, CACHE_SIZE / 2); List<Integer> nonAffinityKeys2 = nearKeys(grid(2).cache(CACHE3_NAME), 100, CACHE_SIZE / 2); @@ -215,7 +237,7 @@ public class IgnitePdsCacheWalDisabledOnRebalancingTest extends GridCommonAbstra nonAffinityKeysSet.addAll(nonAffinityKeys1); nonAffinityKeysSet.addAll(nonAffinityKeys2); - fillCache(ig0.cache(CACHE3_NAME), nonAffinityKeysSet); + fillCache(ig0.dataStreamer(CACHE3_NAME), nonAffinityKeysSet, GENERATING_FUNC); int groupId = ((IgniteEx) ig0).cachex(CACHE3_NAME).context().groupId(); @@ -244,6 +266,128 @@ public class IgnitePdsCacheWalDisabledOnRebalancingTest extends GridCommonAbstra " partitions in MOVING state", allOwned); } + /** + * Scenario: when rebalanced MOVING partitions are owning by checkpointer, + * concurrent affinity change (caused by BLT change) may lead for additional partitions in MOVING state to appear. + * + * In such situation no partitions should be owned until new rebalancing process starts and finishes. + * + * @throws Exception If failed. + */ + public void testRebalancedPartitionsOwningWithConcurrentAffinityChange() throws Exception { + Ignite ig0 = startGridsMultiThreaded(4); + fillCache(ig0.dataStreamer(CACHE3_NAME), CACHE_SIZE, GENERATING_FUNC); + + // Stop idx=2 to prepare for baseline topology change later. + stopGrid(2); + + // Stop idx=1 and cleanup LFS to trigger full rebalancing after it restart. + String ig1Name = "node01-" + grid(1).localNode().consistentId(); + stopGrid(1); + cleanPersistenceFiles(ig1Name); + + // Blocking fileIO and blockMessagePredicate to block checkpointer and rebalancing for node idx=1. + useBlockingFileIO = true; + int groupId = ((IgniteEx) ig0).cachex(CACHE3_NAME).context().groupId(); + blockMessagePredicate = (node, msg) -> { + if (blockRebalanceEnabled.get() && msg instanceof GridDhtPartitionDemandMessage) + return ((GridDhtPartitionDemandMessage) msg).groupId() == groupId; + + return false; + }; + + // Enable blocking checkpointer on node idx=1 (see BlockingCheckpointFileIOFactory). + fileIoBlockingSemaphore.drainPermits(); + + IgniteEx ig1 = startGrid(1); + + CacheGroupMetricsMXBean mxBean = ig1.cachex(CACHE3_NAME).context().group().mxBean(); + int locMovingPartsNum = mxBean.getLocalNodeMovingPartitionsCount(); + + // Partitions remain in MOVING state even after PME and rebalancing when checkpointer is blocked. + assertTrue("Expected non-zero value for local moving partitions count on node idx = 1: " + + locMovingPartsNum, 0 < locMovingPartsNum && locMovingPartsNum < CACHE3_PARTS_NUM); + + blockRebalanceEnabled.set(true); + + // Change baseline topology and release checkpointer to verify + // that no partitions will be owned after affinity change. + ig0.cluster().setBaselineTopology(ig1.context().discovery().topologyVersion()); + fileIoBlockingSemaphore.release(Integer.MAX_VALUE); + + locMovingPartsNum = mxBean.getLocalNodeMovingPartitionsCount(); + assertTrue("Expected moving partitions count on node idx = 1 equals to all partitions of the cache " + + CACHE3_NAME + ": " + locMovingPartsNum, locMovingPartsNum == CACHE3_PARTS_NUM); + + TestRecordingCommunicationSpi commSpi = (TestRecordingCommunicationSpi) ig1 + .configuration().getCommunicationSpi(); + + // When we stop blocking demand message rebalancing should complete and all partitions should be owned. + commSpi.stopBlock(); + + boolean res = GridTestUtils.waitForCondition( + () -> mxBean.getLocalNodeMovingPartitionsCount() == 0, 15_000); + + assertTrue("All partitions on node idx = 1 are expected to be owned", res); + + verifyCache(ig1.cache(CACHE3_NAME), GENERATING_FUNC); + } + + /** FileIOFactory implementation that enables blocking of writes to disk so checkpoint can be blocked. */ + private static class BlockingCheckpointFileIOFactory implements FileIOFactory { + /** Serial version uid. */ + private static final long serialVersionUID = 0L; + + /** Delegate factory. */ + private final FileIOFactory delegateFactory = new RandomAccessFileIOFactory(); + + /** {@inheritDoc} */ + @Override public FileIO create(File file, OpenOption... modes) throws IOException { + FileIO delegate = delegateFactory.create(file, modes); + + return new FileIODecorator(delegate) { + @Override public int write(ByteBuffer srcBuf) throws IOException { + if (Thread.currentThread().getName().contains("checkpoint")) { + try { + fileIoBlockingSemaphore.acquire(); + } + catch (InterruptedException ignored) { + // No-op. + } + } + + return delegate.write(srcBuf); + } + + @Override public int write(ByteBuffer srcBuf, long position) throws IOException { + if (Thread.currentThread().getName().contains("checkpoint")) { + try { + fileIoBlockingSemaphore.acquire(); + } + catch (InterruptedException ignored) { + // No-op. + } + } + + return delegate.write(srcBuf, position); + } + + @Override public int write(byte[] buf, int off, int len) throws IOException { + if (Thread.currentThread().getName().contains("checkpoint")) { + try { + fileIoBlockingSemaphore.acquire(); + } + catch (InterruptedException ignored) { + // No-op. + } + } + + return delegate.write(buf, off, len); + } + }; + } + } + /** */ private void cleanPersistenceFiles(String igName) throws Exception { String ig1DbPath = Paths.get(DFLT_STORE_DIR, igName).toString(); @@ -259,14 +403,39 @@ public class IgnitePdsCacheWalDisabledOnRebalancingTest extends GridCommonAbstra } /** */ - private void fillCache(IgniteCache cache, int cacheSize) { + private void fillCache( + IgniteDataStreamer streamer, + int cacheSize, + BiFunction<String, Integer, String> generatingFunc + ) { + String name = streamer.cacheName(); + for (int i = 0; i < cacheSize; i++) - cache.put(i, "value_" + i); + streamer.addData(i, generatingFunc.apply(name, i)); } /** */ - private void fillCache(IgniteCache cache, Collection<Integer> keys) { + private void fillCache( + IgniteDataStreamer streamer, + Collection<Integer> keys, + BiFunction<String, Integer, String> generatingFunc + ) { + String cacheName = streamer.cacheName(); + for (Integer key : keys) - cache.put(key, "value_" + key); + streamer.addData(key, generatingFunc.apply(cacheName, key)); + } + + /** */ + private void verifyCache(IgniteCache cache, BiFunction<String, Integer, String> generatingFunc) { + int size = cache.size(CachePeekMode.PRIMARY); + + String cacheName = cache.getName(); + + for (int i = 0; i < size; i++) { + String value = (String) cache.get(i); + + assertEquals(generatingFunc.apply(cacheName, i), value); + } } }