[ 
https://issues.apache.org/jira/browse/IGNITE-12605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexey Scherbakov updated IGNITE-12605:
---------------------------------------
       Reviewer: Alexey Scherbakov
    Environment:     (was: 
)

> Historical (WAL) rebalance can start on a cleared partition if some baseline 
> node leaves the cluster and then joins back.
> -------------------------------------------------------------------------------------------------------------------------
>
>                 Key: IGNITE-12605
>                 URL: https://issues.apache.org/jira/browse/IGNITE-12605
>             Project: Ignite
>          Issue Type: Bug
>    Affects Versions: 2.9
>            Reporter: Pavel Pereslegin
>            Assignee: Pavel Pereslegin
>            Priority: Major
>         Attachments: WalRebalanceOnCleanPartitionReproducerUnstable.java
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> _(scenario: cluster with 3 nodes, node3 starts historical rebalancing and 
> then node2 leaves cluster)_
>  On partition map exchange initiated by baseline node leaving, the historical 
> supplier is not provided in the full message (assignPartitionStates() isn't 
> called on coordinator when a node leaves). Since we don't have a historical 
> supplier "historical" partition scheduled for clearing, then when a node 
> joins back assignPartitionStates() is called and we have a supplier for 
> historical rebalance, but partition may be cleared already.
>  After such rebalance we have inconsistent partitions on a "historically 
> rebalanced" node (with consistent partition counters and state).
> "Inlined" reproducer uses TestRecordingCommunicationSpi to sync nodes (but 
> this issue can be "unstable" reproduced without it (see attachment)).
>  
> Reproducer shows the following error.
> {noformat}
> java.lang.AssertionError: 
> |------|-----------------------|
> |      |     entries count     |
> | part |-----------------------|
> |      | node1 | node2 | node3 |
> |------|-----------------------|
> |   0  |  6250 |  6250 |  3125 | 
> |   1  |  6250 |  6250 |  3125 | 
> |   2  |  6250 |  6250 |  3125 | 
>   ... 
> |  31  |  6250 |  6250 |  3125 | 
> |------|-------|-------|-------|
> {noformat}
> (partitions on node3 have been cleared before start historical rebalance). 
>  
> Reproducer:
> {code:java}
> public class WalRebalanceOnCleanPartitionReproducer extends 
> GridCommonAbstractTest {
>     /** Block predicate. */
>     private P2<ClusterNode, Message> blockPred;
>     /** {@inheritDoc} */
>     @Override protected IgniteConfiguration getConfiguration(String gridName) 
> throws Exception {
>         IgniteConfiguration cfg = super.getConfiguration(gridName);
>         cfg.setConsistentId(gridName);
>         cfg.setRebalanceThreadPoolSize(1);
>         CacheConfiguration ccfg1 = new CacheConfiguration(DEFAULT_CACHE_NAME)
>             .setCacheMode(CacheMode.PARTITIONED)
>             .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
>             .setBackups(2)
>             .setAffinity(new RendezvousAffinityFunction(false, 32))
>             .setRebalanceMode(CacheRebalanceMode.ASYNC);
>         cfg.setCacheConfiguration(ccfg1);
>         TestRecordingCommunicationSpi commSpi = new 
> TestRecordingCommunicationSpi();
>         commSpi.blockMessages(blockPred);
>         cfg.setCommunicationSpi(commSpi);
>         DataStorageConfiguration dsCfg = new DataStorageConfiguration()
>             .setConcurrencyLevel(Runtime.getRuntime().availableProcessors() * 
> 4)
>             .setCheckpointFrequency(5_000)
>             .setWalMode(WALMode.LOG_ONLY)
>             .setPageSize(1024)
>             .setWalSegmentSize(8 * 1024 * 1024)
>             .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
>                 .setName("dfltDataRegion")
>                 .setPersistenceEnabled(true)
>                 .setMaxSize(512 * 1024 * 1024)
>             );
>         cfg.setDataStorageConfiguration(dsCfg);
>         return cfg;
>     }
>     /** {@inheritDoc} */
>     @Override protected void beforeTestsStarted() throws Exception {
>         stopAllGrids();
>         cleanPersistenceDir();
>         super.beforeTestsStarted();
>     }
>     /**
>      *
>      */
>     @Test
>     @WithSystemProperty(key = IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "0")
>     public void testHistoricalRebalanceRestart() throws Exception {
>         IgniteEx crd = startGrid(0);
>         crd.cluster().state(ClusterState.ACTIVE);
>         crd.cluster().baselineAutoAdjustEnabled(false);
>         Ignite node1 = startGrid(1);
>         Ignite node2 = startGrid(2);
>         List<ClusterNode> blt = new 
> ArrayList<>(crd.context().discovery().aliveServerNodes());
>         crd.cluster().setBaselineTopology(blt);
>         IgniteCache<Integer, String> cache0 = crd.cache(DEFAULT_CACHE_NAME);
>         System.out.println(">>> load 100k entries");
>         loadData(cache0, 0, 100_000);
>         forceCheckpoint();
>         System.out.println(">>> stop node 2");
>         node2.close();
>         awaitPartitionMapExchange();
>         System.out.println(">>> load 100k entries again");
>         loadData(cache0, 100_000, 100_000);
>         blockPred = (node, msg) -> {
>             if (msg instanceof GridDhtPartitionDemandMessage) {
>                 GridDhtPartitionDemandMessage msg0 = 
> (GridDhtPartitionDemandMessage)msg;
>                 return msg0.groupId() == CU.cacheId(DEFAULT_CACHE_NAME);
>             }
>             return false;
>         };
>         startGrid(2);
>         TestRecordingCommunicationSpi spi2 = 
> TestRecordingCommunicationSpi.spi(grid(2));
>         spi2.waitForBlocked();
>         spi2.stopBlock();
>         // Forces rebalanceing to restart without assign partition states.
>         System.out.println(">>> stop grid 1");
>         node1.close();
>         spi2.blockMessages(blockPred);
>         spi2.waitForBlocked();
>         System.out.println(">>> start grid 1");
>         startGrid(1);
>         spi2.stopBlock();
>         // just to be sure
>         U.sleep(3_000);
>         awaitPartitionMapExchange();
>         verifyPartittionSizes();
>     }
>     /** */
>     private void verifyPartittionSizes() {
>         int grids = G.allGrids().size();
>         SB buf = new SB();
>         for (int p = 0; p < 32; p++) {
>             Set<Long> sizesSet = new LinkedHashSet<>();
>             List<GridDhtLocalPartition> parts = new ArrayList<>();
>             for (int n = 0; n < grids; n++) {
>                 GridDhtLocalPartition part = 
> grid(n).cachex(DEFAULT_CACHE_NAME).context().topology().localPartition(p);
>                 assert part != null;
>                 assert part.state() == GridDhtPartitionState.OWNING;
>                 sizesSet.add(part.fullSize());
>                 parts.add(part);
>             }
>             if (sizesSet.size() == 1)
>                 continue;
>             buf.a(String.format("\n|  %2d  | ", p));
>             for (GridDhtLocalPartition part : parts)
>                 buf.a(String.format(" %04d", part.fullSize())).a(" | ");
>         }
>         assertTrue("\n|------|-----------------------|" +
>             "\n|      |     entries count     |" +
>             "\n| part |-----------------------|" +
>             "\n|      | node1 | node2 | node3 |" +
>             "\n|------|-----------------------|" +
>             buf +
>             "\n|------|-------|-------|-------|", buf.length() == 0);
>     }
>     /** */
>     private void loadData(IgniteCache cache, int off, int cnt) {
>         try (IgniteDataStreamer<Integer, String> streamer = 
> grid(0).dataStreamer(cache.getName())) {
>             for (int i = off; i < off + cnt; i++)
>                 streamer.addData(i, String.valueOf(i));
>         }
>     }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to