Pavel Pereslegin created IGNITE-12605: -----------------------------------------
Summary: Historical (WAL) rebalance can start on a cleared partition if some baseline node leaves the cluster and then joins back. Key: IGNITE-12605 URL: https://issues.apache.org/jira/browse/IGNITE-12605 Project: Ignite Issue Type: Bug Affects Versions: 2.9 Environment: Reporter: Pavel Pereslegin Assignee: Pavel Pereslegin On partition map exchange initiated by baseline node leaving, the historical supplier is not provided in the full message (assignPartitionStates() isn't called when a node leaves). Since we don't have a historical supplier "historical" partition scheduled for clearing, then when a node joins back assignPartitionStates() is called and we have a supplier for historical rebalance, but partition may be cleared already. After such rebalance we have inconsistent partitions on a "historically rebalanced" node (with consistent partition counters and state). "Inlined" reproducer uses TestRecordingCommunicationSpi to sync nodes (but this issue can be "unstable" reproduced without it (see attachment)). Reproducer shows the following errors. Error 1 (partitions have been cleared). {noformat} java.lang.AssertionError: |------|-----------------------| | | entries count | | part |-----------------------| | | node1 | node2 | node3 | |------|-----------------------| | 0 | 6250 | 6250 | 3125 | | 1 | 6250 | 6250 | 3125 | | 2 | 6250 | 6250 | 3125 | ... | 31 | 6250 | 6250 | 3125 | |------|-------|-------|-------| {noformat} Error 2 (should be investigated deeply). {noformat} java.lang.AssertionError: Reached end of WAL but not all partitions are done at org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager$WALHistoricalIterator.advance(GridCacheOffheapManager.java:1419) at org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager$WALHistoricalIterator.next(GridCacheOffheapManager.java:1295) at org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager$WALHistoricalIterator.nextX(GridCacheOffheapManager.java:1255) at org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager$WALHistoricalIterator.nextX(GridCacheOffheapManager.java:1163) at org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteRebalanceIteratorImpl.nextX(IgniteRebalanceIteratorImpl.java:135) at org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteRebalanceIteratorImpl.next(IgniteRebalanceIteratorImpl.java:215) at org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteRebalanceIteratorImpl.peek(IgniteRebalanceIteratorImpl.java:155) at org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplier.handleDemandMessage(GridDhtPartitionSupplier.java:316) at org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader.lambda$handleDemandMessage$1(GridDhtPreloader.java:374) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {noformat} Reproducer: {code:java} import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CacheRebalanceMode; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.cluster.ClusterState; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.WALMode; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.TestRecordingCommunicationSpi; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.P2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.SB; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.WithSystemProperty; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD; /** */ public class WalRebalanceOnCleanPartitionReproducer extends GridCommonAbstractTest { /** Block predicate. */ private P2<ClusterNode, Message> blockPred; /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(gridName); cfg.setConsistentId(gridName); cfg.setRebalanceThreadPoolSize(1); CacheConfiguration ccfg1 = new CacheConfiguration(DEFAULT_CACHE_NAME) .setCacheMode(CacheMode.PARTITIONED) .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) .setBackups(2) .setAffinity(new RendezvousAffinityFunction(false, 32)) .setRebalanceMode(CacheRebalanceMode.ASYNC); cfg.setCacheConfiguration(ccfg1); TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi(); commSpi.blockMessages(blockPred); cfg.setCommunicationSpi(commSpi); DataStorageConfiguration dsCfg = new DataStorageConfiguration() .setConcurrencyLevel(Runtime.getRuntime().availableProcessors() * 4) .setCheckpointFrequency(5_000) .setWalMode(WALMode.LOG_ONLY) .setPageSize(1024) .setWalSegmentSize(8 * 1024 * 1024) .setDefaultDataRegionConfiguration(new DataRegionConfiguration() .setName("dfltDataRegion") .setPersistenceEnabled(true) .setMaxSize(512 * 1024 * 1024) ); cfg.setDataStorageConfiguration(dsCfg); return cfg; } /** {@inheritDoc} */ @Override protected void beforeTestsStarted() throws Exception { stopAllGrids(); cleanPersistenceDir(); super.beforeTestsStarted(); } /** * */ @Test @WithSystemProperty(key = IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "0") public void testHistoricalRebalanceNotStartsAfterNodeLeft() throws Exception { IgniteEx crd = startGrid(0); crd.cluster().state(ClusterState.ACTIVE); crd.cluster().baselineAutoAdjustEnabled(false); Ignite node1 = startGrid(1); Ignite node2 = startGrid(2); List<ClusterNode> blt = new ArrayList<>(crd.context().discovery().aliveServerNodes()); crd.cluster().setBaselineTopology(blt); IgniteCache<Integer, String> cache0 = crd.cache(DEFAULT_CACHE_NAME); System.out.println(">>> load 100k entries"); loadData(cache0, 0, 100_000); forceCheckpoint(); System.out.println(">>> stop node 2"); node2.close(); awaitPartitionMapExchange(); System.out.println(">>> load 100k entries again"); loadData(cache0, 100_000, 100_000); CountDownLatch startLatch = new CountDownLatch(1); blockPred = (node, msg) -> { if (msg instanceof GridDhtPartitionDemandMessage) { GridDhtPartitionDemandMessage msg0 = (GridDhtPartitionDemandMessage)msg; return msg0.groupId() == CU.cacheId(DEFAULT_CACHE_NAME); } return false; }; GridTestUtils.runAsync(() -> { System.out.println(">>> start grid 2"); startGrid(2); startLatch.countDown(); return null; }); startLatch.await(); TestRecordingCommunicationSpi spi2 = TestRecordingCommunicationSpi.spi(grid(2)); spi2.waitForBlocked(); spi2.stopBlock(); // Forces rebalanceing to restart without assign partition states. System.out.println(">>> stop grid 1"); node1.close(); spi2.blockMessages(blockPred); spi2.waitForBlocked(); System.out.println(">>> start grid 1"); startGrid(1); spi2.stopBlock(); // just to be sure U.sleep(3_000); awaitPartitionMapExchange(); verifyPartittionSizes(); } /** */ private void verifyPartittionSizes() { int grids = G.allGrids().size(); SB buf = new SB(); for (int p = 0; p < 32; p++) { Set<Long> sizesSet = new LinkedHashSet<>(); List<GridDhtLocalPartition> parts = new ArrayList<>(); for (int n = 0; n < grids; n++) { GridDhtLocalPartition part = grid(n).cachex(DEFAULT_CACHE_NAME).context().topology().localPartition(p); assert part != null; assert part.state() == GridDhtPartitionState.OWNING; sizesSet.add(part.fullSize()); parts.add(part); } if (sizesSet.size() == 1) continue; buf.a(String.format("\n| %2d | ", p)); for (GridDhtLocalPartition part : parts) buf.a(String.format(" %04d", part.fullSize())).a(" | "); } assertTrue("\n|------|-----------------------|" + "\n| | entries count |" + "\n| part |-----------------------|" + "\n| | node1 | node2 | node3 |" + "\n|------|-----------------------|" + buf + "\n|------|-------|-------|-------|", buf.length() == 0); } /** */ private void loadData(IgniteCache cache, int off, int cnt) { try (IgniteDataStreamer<Integer, String> streamer = grid(0).dataStreamer(cache.getName())) { for (int i = off; i < off + cnt; i++) streamer.addData(i, String.valueOf(i)); } } } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)