[ https://issues.apache.org/jira/browse/IGNITE-12605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Alexey Scherbakov updated IGNITE-12605: --------------------------------------- Reviewer: Alexey Scherbakov Environment: (was: ) > Historical (WAL) rebalance can start on a cleared partition if some baseline > node leaves the cluster and then joins back. > ------------------------------------------------------------------------------------------------------------------------- > > Key: IGNITE-12605 > URL: https://issues.apache.org/jira/browse/IGNITE-12605 > Project: Ignite > Issue Type: Bug > Affects Versions: 2.9 > Reporter: Pavel Pereslegin > Assignee: Pavel Pereslegin > Priority: Major > Attachments: WalRebalanceOnCleanPartitionReproducerUnstable.java > > Time Spent: 10m > Remaining Estimate: 0h > > _(scenario: cluster with 3 nodes, node3 starts historical rebalancing and > then node2 leaves cluster)_ > On partition map exchange initiated by baseline node leaving, the historical > supplier is not provided in the full message (assignPartitionStates() isn't > called on coordinator when a node leaves). Since we don't have a historical > supplier "historical" partition scheduled for clearing, then when a node > joins back assignPartitionStates() is called and we have a supplier for > historical rebalance, but partition may be cleared already. > After such rebalance we have inconsistent partitions on a "historically > rebalanced" node (with consistent partition counters and state). > "Inlined" reproducer uses TestRecordingCommunicationSpi to sync nodes (but > this issue can be "unstable" reproduced without it (see attachment)). > > Reproducer shows the following error. > {noformat} > java.lang.AssertionError: > |------|-----------------------| > | | entries count | > | part |-----------------------| > | | node1 | node2 | node3 | > |------|-----------------------| > | 0 | 6250 | 6250 | 3125 | > | 1 | 6250 | 6250 | 3125 | > | 2 | 6250 | 6250 | 3125 | > ... > | 31 | 6250 | 6250 | 3125 | > |------|-------|-------|-------| > {noformat} > (partitions on node3 have been cleared before start historical rebalance). > > Reproducer: > {code:java} > public class WalRebalanceOnCleanPartitionReproducer extends > GridCommonAbstractTest { > /** Block predicate. */ > private P2<ClusterNode, Message> blockPred; > /** {@inheritDoc} */ > @Override protected IgniteConfiguration getConfiguration(String gridName) > throws Exception { > IgniteConfiguration cfg = super.getConfiguration(gridName); > cfg.setConsistentId(gridName); > cfg.setRebalanceThreadPoolSize(1); > CacheConfiguration ccfg1 = new CacheConfiguration(DEFAULT_CACHE_NAME) > .setCacheMode(CacheMode.PARTITIONED) > .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) > .setBackups(2) > .setAffinity(new RendezvousAffinityFunction(false, 32)) > .setRebalanceMode(CacheRebalanceMode.ASYNC); > cfg.setCacheConfiguration(ccfg1); > TestRecordingCommunicationSpi commSpi = new > TestRecordingCommunicationSpi(); > commSpi.blockMessages(blockPred); > cfg.setCommunicationSpi(commSpi); > DataStorageConfiguration dsCfg = new DataStorageConfiguration() > .setConcurrencyLevel(Runtime.getRuntime().availableProcessors() * > 4) > .setCheckpointFrequency(5_000) > .setWalMode(WALMode.LOG_ONLY) > .setPageSize(1024) > .setWalSegmentSize(8 * 1024 * 1024) > .setDefaultDataRegionConfiguration(new DataRegionConfiguration() > .setName("dfltDataRegion") > .setPersistenceEnabled(true) > .setMaxSize(512 * 1024 * 1024) > ); > cfg.setDataStorageConfiguration(dsCfg); > return cfg; > } > /** {@inheritDoc} */ > @Override protected void beforeTestsStarted() throws Exception { > stopAllGrids(); > cleanPersistenceDir(); > super.beforeTestsStarted(); > } > /** > * > */ > @Test > @WithSystemProperty(key = IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "0") > public void testHistoricalRebalanceRestart() throws Exception { > IgniteEx crd = startGrid(0); > crd.cluster().state(ClusterState.ACTIVE); > crd.cluster().baselineAutoAdjustEnabled(false); > Ignite node1 = startGrid(1); > Ignite node2 = startGrid(2); > List<ClusterNode> blt = new > ArrayList<>(crd.context().discovery().aliveServerNodes()); > crd.cluster().setBaselineTopology(blt); > IgniteCache<Integer, String> cache0 = crd.cache(DEFAULT_CACHE_NAME); > System.out.println(">>> load 100k entries"); > loadData(cache0, 0, 100_000); > forceCheckpoint(); > System.out.println(">>> stop node 2"); > node2.close(); > awaitPartitionMapExchange(); > System.out.println(">>> load 100k entries again"); > loadData(cache0, 100_000, 100_000); > blockPred = (node, msg) -> { > if (msg instanceof GridDhtPartitionDemandMessage) { > GridDhtPartitionDemandMessage msg0 = > (GridDhtPartitionDemandMessage)msg; > return msg0.groupId() == CU.cacheId(DEFAULT_CACHE_NAME); > } > return false; > }; > startGrid(2); > TestRecordingCommunicationSpi spi2 = > TestRecordingCommunicationSpi.spi(grid(2)); > spi2.waitForBlocked(); > spi2.stopBlock(); > // Forces rebalanceing to restart without assign partition states. > System.out.println(">>> stop grid 1"); > node1.close(); > spi2.blockMessages(blockPred); > spi2.waitForBlocked(); > System.out.println(">>> start grid 1"); > startGrid(1); > spi2.stopBlock(); > // just to be sure > U.sleep(3_000); > awaitPartitionMapExchange(); > verifyPartittionSizes(); > } > /** */ > private void verifyPartittionSizes() { > int grids = G.allGrids().size(); > SB buf = new SB(); > for (int p = 0; p < 32; p++) { > Set<Long> sizesSet = new LinkedHashSet<>(); > List<GridDhtLocalPartition> parts = new ArrayList<>(); > for (int n = 0; n < grids; n++) { > GridDhtLocalPartition part = > grid(n).cachex(DEFAULT_CACHE_NAME).context().topology().localPartition(p); > assert part != null; > assert part.state() == GridDhtPartitionState.OWNING; > sizesSet.add(part.fullSize()); > parts.add(part); > } > if (sizesSet.size() == 1) > continue; > buf.a(String.format("\n| %2d | ", p)); > for (GridDhtLocalPartition part : parts) > buf.a(String.format(" %04d", part.fullSize())).a(" | "); > } > assertTrue("\n|------|-----------------------|" + > "\n| | entries count |" + > "\n| part |-----------------------|" + > "\n| | node1 | node2 | node3 |" + > "\n|------|-----------------------|" + > buf + > "\n|------|-------|-------|-------|", buf.length() == 0); > } > /** */ > private void loadData(IgniteCache cache, int off, int cnt) { > try (IgniteDataStreamer<Integer, String> streamer = > grid(0).dataStreamer(cache.getName())) { > for (int i = off; i < off + cnt; i++) > streamer.addData(i, String.valueOf(i)); > } > } > } > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)