[ 
https://issues.apache.org/jira/browse/IGNITE-12605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17027440#comment-17027440
 ] 

Pavel Pereslegin commented on IGNITE-12605:
-------------------------------------------

I don’t think that calling the {{assignPartitionStates()}} method when the node 
leaves is the right way, because I’m pretty sure that we may encounter a 
similar problem when the only supplier (historical) “blinks”, so I propose an 
even more trivial solution - reset the initial update counter value before 
scheduling the partition for clearing.

> Historical (WAL) rebalance can start on a cleared partition if some baseline 
> node leaves the cluster and then joins back.
> -------------------------------------------------------------------------------------------------------------------------
>
>                 Key: IGNITE-12605
>                 URL: https://issues.apache.org/jira/browse/IGNITE-12605
>             Project: Ignite
>          Issue Type: Bug
>    Affects Versions: 2.9
>         Environment: 
>            Reporter: Pavel Pereslegin
>            Assignee: Pavel Pereslegin
>            Priority: Major
>         Attachments: WalRebalanceOnCleanPartitionReproducerUnstable.java
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> _(scenario: cluster with 3 nodes, node3 starts historical rebalancing and 
> then node2 leaves cluster)_
>  On partition map exchange initiated by baseline node leaving, the historical 
> supplier is not provided in the full message (assignPartitionStates() isn't 
> called on coordinator when a node leaves). Since we don't have a historical 
> supplier "historical" partition scheduled for clearing, then when a node 
> joins back assignPartitionStates() is called and we have a supplier for 
> historical rebalance, but partition may be cleared already.
>  After such rebalance we have inconsistent partitions on a "historically 
> rebalanced" node (with consistent partition counters and state).
> "Inlined" reproducer uses TestRecordingCommunicationSpi to sync nodes (but 
> this issue can be "unstable" reproduced without it (see attachment)).
> Reproducer shows the following errors.
> Error 1 (partitions on node3 have been cleared before start historical 
> rebalance).
> {noformat}
> java.lang.AssertionError: 
> |------|-----------------------|
> |      |     entries count     |
> | part |-----------------------|
> |      | node1 | node2 | node3 |
> |------|-----------------------|
> |   0  |  6250 |  6250 |  3125 | 
> |   1  |  6250 |  6250 |  3125 | 
> |   2  |  6250 |  6250 |  3125 | 
>   ... 
> |  31  |  6250 |  6250 |  3125 | 
> |------|-------|-------|-------|
> {noformat}
> Error 2 (should be investigated deeply).
> {noformat}
> java.lang.AssertionError: Reached end of WAL but not all partitions are done
>       at 
> org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager$WALHistoricalIterator.advance(GridCacheOffheapManager.java:1419)
>       at 
> org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager$WALHistoricalIterator.next(GridCacheOffheapManager.java:1295)
>       at 
> org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager$WALHistoricalIterator.nextX(GridCacheOffheapManager.java:1255)
>       at 
> org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager$WALHistoricalIterator.nextX(GridCacheOffheapManager.java:1163)
>       at 
> org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteRebalanceIteratorImpl.nextX(IgniteRebalanceIteratorImpl.java:135)
>       at 
> org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteRebalanceIteratorImpl.next(IgniteRebalanceIteratorImpl.java:215)
>       at 
> org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteRebalanceIteratorImpl.peek(IgniteRebalanceIteratorImpl.java:155)
>       at 
> org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplier.handleDemandMessage(GridDhtPartitionSupplier.java:316)
>       at 
> org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader.lambda$handleDemandMessage$1(GridDhtPreloader.java:374)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> {noformat}
>  
> Reproducer:
> {code:java}
> import java.util.ArrayList;
> import java.util.LinkedHashSet;
> import java.util.List;
> import java.util.Set;
> import java.util.concurrent.CountDownLatch;
> import org.apache.ignite.Ignite;
> import org.apache.ignite.IgniteCache;
> import org.apache.ignite.IgniteDataStreamer;
> import org.apache.ignite.cache.CacheAtomicityMode;
> import org.apache.ignite.cache.CacheMode;
> import org.apache.ignite.cache.CacheRebalanceMode;
> import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
> import org.apache.ignite.cluster.ClusterNode;
> import org.apache.ignite.cluster.ClusterState;
> import org.apache.ignite.configuration.CacheConfiguration;
> import org.apache.ignite.configuration.DataRegionConfiguration;
> import org.apache.ignite.configuration.DataStorageConfiguration;
> import org.apache.ignite.configuration.IgniteConfiguration;
> import org.apache.ignite.configuration.WALMode;
> import org.apache.ignite.internal.IgniteEx;
> import org.apache.ignite.internal.TestRecordingCommunicationSpi;
> import 
> org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
> import 
> org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
> import 
> org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
> import org.apache.ignite.internal.util.typedef.G;
> import org.apache.ignite.internal.util.typedef.P2;
> import org.apache.ignite.internal.util.typedef.internal.CU;
> import org.apache.ignite.internal.util.typedef.internal.SB;
> import org.apache.ignite.internal.util.typedef.internal.U;
> import org.apache.ignite.plugin.extensions.communication.Message;
> import org.apache.ignite.testframework.GridTestUtils;
> import org.apache.ignite.testframework.junits.WithSystemProperty;
> import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
> import org.junit.Test;
> import static 
> org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
> /** */
> public class WalRebalanceOnCleanPartitionReproducer extends 
> GridCommonAbstractTest {
>     /** Block predicate. */
>     private P2<ClusterNode, Message> blockPred;
>     /** {@inheritDoc} */
>     @Override protected IgniteConfiguration getConfiguration(String gridName) 
> throws Exception {
>         IgniteConfiguration cfg = super.getConfiguration(gridName);
>         cfg.setConsistentId(gridName);
>         cfg.setRebalanceThreadPoolSize(1);
>         CacheConfiguration ccfg1 = new CacheConfiguration(DEFAULT_CACHE_NAME)
>             .setCacheMode(CacheMode.PARTITIONED)
>             .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
>             .setBackups(2)
>             .setAffinity(new RendezvousAffinityFunction(false, 32))
>             .setRebalanceMode(CacheRebalanceMode.ASYNC);
>         cfg.setCacheConfiguration(ccfg1);
>         TestRecordingCommunicationSpi commSpi = new 
> TestRecordingCommunicationSpi();
>         commSpi.blockMessages(blockPred);
>         cfg.setCommunicationSpi(commSpi);
>         DataStorageConfiguration dsCfg = new DataStorageConfiguration()
>             .setConcurrencyLevel(Runtime.getRuntime().availableProcessors() * 
> 4)
>             .setCheckpointFrequency(5_000)
>             .setWalMode(WALMode.LOG_ONLY)
>             .setPageSize(1024)
>             .setWalSegmentSize(8 * 1024 * 1024)
>             .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
>                 .setName("dfltDataRegion")
>                 .setPersistenceEnabled(true)
>                 .setMaxSize(512 * 1024 * 1024)
>             );
>         cfg.setDataStorageConfiguration(dsCfg);
>         return cfg;
>     }
>     /** {@inheritDoc} */
>     @Override protected void beforeTestsStarted() throws Exception {
>         stopAllGrids();
>         cleanPersistenceDir();
>         super.beforeTestsStarted();
>     }
>     /**
>      *
>      */
>     @Test
>     @WithSystemProperty(key = IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "0")
>     public void testHistoricalRebalanceNotStartsAfterNodeLeft() throws 
> Exception {
>         IgniteEx crd = startGrid(0);
>         crd.cluster().state(ClusterState.ACTIVE);
>         crd.cluster().baselineAutoAdjustEnabled(false);
>         Ignite node1 = startGrid(1);
>         Ignite node2 = startGrid(2);
>         List<ClusterNode> blt = new 
> ArrayList<>(crd.context().discovery().aliveServerNodes());
>         crd.cluster().setBaselineTopology(blt);
>         IgniteCache<Integer, String> cache0 = crd.cache(DEFAULT_CACHE_NAME);
>         System.out.println(">>> load 100k entries");
>         loadData(cache0, 0, 100_000);
>         forceCheckpoint();
>         System.out.println(">>> stop node 2");
>         node2.close();
>         awaitPartitionMapExchange();
>         System.out.println(">>> load 100k entries again");
>         loadData(cache0, 100_000, 100_000);
>         CountDownLatch startLatch = new CountDownLatch(1);
>         blockPred = (node, msg) -> {
>             if (msg instanceof GridDhtPartitionDemandMessage) {
>                 GridDhtPartitionDemandMessage msg0 = 
> (GridDhtPartitionDemandMessage)msg;
>                 return msg0.groupId() == CU.cacheId(DEFAULT_CACHE_NAME);
>             }
>             return false;
>         };
>         GridTestUtils.runAsync(() -> {
>             System.out.println(">>> start grid 2");
>             startGrid(2);
>             startLatch.countDown();
>             return null;
>         });
>         startLatch.await();
>         TestRecordingCommunicationSpi spi2 = 
> TestRecordingCommunicationSpi.spi(grid(2));
>         spi2.waitForBlocked();
>         spi2.stopBlock();
>         // Forces rebalanceing to restart without assign partition states.
>         System.out.println(">>> stop grid 1");
>         node1.close();
>         spi2.blockMessages(blockPred);
>         spi2.waitForBlocked();
>         System.out.println(">>> start grid 1");
>         startGrid(1);
>         spi2.stopBlock();
>         // just to be sure
>         U.sleep(3_000);
>         awaitPartitionMapExchange();
>         verifyPartittionSizes();
>     }
>     /** */
>     private void verifyPartittionSizes() {
>         int grids = G.allGrids().size();
>         SB buf = new SB();
>         for (int p = 0; p < 32; p++) {
>             Set<Long> sizesSet = new LinkedHashSet<>();
>             List<GridDhtLocalPartition> parts = new ArrayList<>();
>             for (int n = 0; n < grids; n++) {
>                 GridDhtLocalPartition part = 
> grid(n).cachex(DEFAULT_CACHE_NAME).context().topology().localPartition(p);
>                 assert part != null;
>                 assert part.state() == GridDhtPartitionState.OWNING;
>                 sizesSet.add(part.fullSize());
>                 parts.add(part);
>             }
>             if (sizesSet.size() == 1)
>                 continue;
>             buf.a(String.format("\n|  %2d  | ", p));
>             for (GridDhtLocalPartition part : parts)
>                 buf.a(String.format(" %04d", part.fullSize())).a(" | ");
>         }
>         assertTrue("\n|------|-----------------------|" +
>             "\n|      |     entries count     |" +
>             "\n| part |-----------------------|" +
>             "\n|      | node1 | node2 | node3 |" +
>             "\n|------|-----------------------|" +
>             buf +
>             "\n|------|-------|-------|-------|", buf.length() == 0);
>     }
>     /** */
>     private void loadData(IgniteCache cache, int off, int cnt) {
>         try (IgniteDataStreamer<Integer, String> streamer = 
> grid(0).dataStreamer(cache.getName())) {
>             for (int i = off; i < off + cnt; i++)
>                 streamer.addData(i, String.valueOf(i));
>         }
>     }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to