[ 
https://issues.apache.org/jira/browse/IGNITE-12605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pavel Pereslegin updated IGNITE-12605:
--------------------------------------
    Description: 
_(scenario: cluster with 3 nodes, node3 starts historical rebalancing and then 
node2 leaves cluster)_
 On partition map exchange initiated by baseline node leaving, the historical 
supplier is not provided in the full message (assignPartitionStates() isn't 
called on coordinator when a node leaves). Since we don't have a historical 
supplier "historical" partition scheduled for clearing, then when a node joins 
back assignPartitionStates() is called and we have a supplier for historical 
rebalance, but partition may be cleared already.
 After such rebalance we have inconsistent partitions on a "historically 
rebalanced" node (with consistent partition counters and state).

"Inlined" reproducer uses TestRecordingCommunicationSpi to sync nodes (but this 
issue can be "unstable" reproduced without it (see attachment)).

 

Reproducer shows the following error.
{noformat}
java.lang.AssertionError: 
|------|-----------------------|
|      |     entries count     |
| part |-----------------------|
|      | node1 | node2 | node3 |
|------|-----------------------|
|   0  |  6250 |  6250 |  3125 | 
|   1  |  6250 |  6250 |  3125 | 
|   2  |  6250 |  6250 |  3125 | 
  ... 
|  31  |  6250 |  6250 |  3125 | 
|------|-------|-------|-------|
{noformat}
(partitions on node3 have been cleared before start historical rebalance). 

 

Reproducer:
{code:java}
public class WalRebalanceOnCleanPartitionReproducer extends 
GridCommonAbstractTest {
    /** Block predicate. */
    private P2<ClusterNode, Message> blockPred;

    /** {@inheritDoc} */
    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
        IgniteConfiguration cfg = super.getConfiguration(gridName);

        cfg.setConsistentId(gridName);

        cfg.setRebalanceThreadPoolSize(1);

        CacheConfiguration ccfg1 = new CacheConfiguration(DEFAULT_CACHE_NAME)
            .setCacheMode(CacheMode.PARTITIONED)
            .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
            .setBackups(2)
            .setAffinity(new RendezvousAffinityFunction(false, 32))
            .setRebalanceMode(CacheRebalanceMode.ASYNC);

        cfg.setCacheConfiguration(ccfg1);

        TestRecordingCommunicationSpi commSpi = new 
TestRecordingCommunicationSpi();

        commSpi.blockMessages(blockPred);

        cfg.setCommunicationSpi(commSpi);

        DataStorageConfiguration dsCfg = new DataStorageConfiguration()
            .setConcurrencyLevel(Runtime.getRuntime().availableProcessors() * 4)
            .setCheckpointFrequency(5_000)
            .setWalMode(WALMode.LOG_ONLY)
            .setPageSize(1024)
            .setWalSegmentSize(8 * 1024 * 1024)
            .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
                .setName("dfltDataRegion")
                .setPersistenceEnabled(true)
                .setMaxSize(512 * 1024 * 1024)
            );

        cfg.setDataStorageConfiguration(dsCfg);

        return cfg;
    }

    /** {@inheritDoc} */
    @Override protected void beforeTestsStarted() throws Exception {
        stopAllGrids();

        cleanPersistenceDir();

        super.beforeTestsStarted();
    }

    /**
     *
     */
    @Test
    @WithSystemProperty(key = IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "0")
    public void testHistoricalRebalanceRestart() throws Exception {
        IgniteEx crd = startGrid(0);

        crd.cluster().state(ClusterState.ACTIVE);
        crd.cluster().baselineAutoAdjustEnabled(false);

        Ignite node1 = startGrid(1);
        Ignite node2 = startGrid(2);

        List<ClusterNode> blt = new 
ArrayList<>(crd.context().discovery().aliveServerNodes());

        crd.cluster().setBaselineTopology(blt);

        IgniteCache<Integer, String> cache0 = crd.cache(DEFAULT_CACHE_NAME);

        System.out.println(">>> load 100k entries");
        loadData(cache0, 0, 100_000);

        forceCheckpoint();

        System.out.println(">>> stop node 2");
        node2.close();

        awaitPartitionMapExchange();

        System.out.println(">>> load 100k entries again");
        loadData(cache0, 100_000, 100_000);

        blockPred = (node, msg) -> {
            if (msg instanceof GridDhtPartitionDemandMessage) {
                GridDhtPartitionDemandMessage msg0 = 
(GridDhtPartitionDemandMessage)msg;

                return msg0.groupId() == CU.cacheId(DEFAULT_CACHE_NAME);
            }

            return false;
        };

        startGrid(2);

        TestRecordingCommunicationSpi spi2 = 
TestRecordingCommunicationSpi.spi(grid(2));

        spi2.waitForBlocked();
        spi2.stopBlock();

        // Forces rebalanceing to restart without assign partition states.
        System.out.println(">>> stop grid 1");
        node1.close();

        spi2.blockMessages(blockPred);
        spi2.waitForBlocked();

        System.out.println(">>> start grid 1");
        startGrid(1);

        spi2.stopBlock();

        // just to be sure
        U.sleep(3_000);

        awaitPartitionMapExchange();

        verifyPartittionSizes();
    }

    /** */
    private void verifyPartittionSizes() {
        int grids = G.allGrids().size();

        SB buf = new SB();

        for (int p = 0; p < 32; p++) {
            Set<Long> sizesSet = new LinkedHashSet<>();
            List<GridDhtLocalPartition> parts = new ArrayList<>();

            for (int n = 0; n < grids; n++) {
                GridDhtLocalPartition part = 
grid(n).cachex(DEFAULT_CACHE_NAME).context().topology().localPartition(p);

                assert part != null;
                assert part.state() == GridDhtPartitionState.OWNING;

                sizesSet.add(part.fullSize());
                parts.add(part);
            }

            if (sizesSet.size() == 1)
                continue;

            buf.a(String.format("\n|  %2d  | ", p));

            for (GridDhtLocalPartition part : parts)
                buf.a(String.format(" %04d", part.fullSize())).a(" | ");
        }

        assertTrue("\n|------|-----------------------|" +
            "\n|      |     entries count     |" +
            "\n| part |-----------------------|" +
            "\n|      | node1 | node2 | node3 |" +
            "\n|------|-----------------------|" +
            buf +
            "\n|------|-------|-------|-------|", buf.length() == 0);
    }

    /** */
    private void loadData(IgniteCache cache, int off, int cnt) {
        try (IgniteDataStreamer<Integer, String> streamer = 
grid(0).dataStreamer(cache.getName())) {
            for (int i = off; i < off + cnt; i++)
                streamer.addData(i, String.valueOf(i));
        }
    }
}
{code}

  was:
_(scenario: cluster with 3 nodes, node3 starts historical rebalancing and then 
node2 leaves cluster)_
 On partition map exchange initiated by baseline node leaving, the historical 
supplier is not provided in the full message (assignPartitionStates() isn't 
called on coordinator when a node leaves). Since we don't have a historical 
supplier "historical" partition scheduled for clearing, then when a node joins 
back assignPartitionStates() is called and we have a supplier for historical 
rebalance, but partition may be cleared already.
 After such rebalance we have inconsistent partitions on a "historically 
rebalanced" node (with consistent partition counters and state).

"Inlined" reproducer uses TestRecordingCommunicationSpi to sync nodes (but this 
issue can be "unstable" reproduced without it (see attachment)).

Reproducer shows the following error.
{noformat}
java.lang.AssertionError: 
|------|-----------------------|
|      |     entries count     |
| part |-----------------------|
|      | node1 | node2 | node3 |
|------|-----------------------|
|   0  |  6250 |  6250 |  3125 | 
|   1  |  6250 |  6250 |  3125 | 
|   2  |  6250 |  6250 |  3125 | 
  ... 
|  31  |  6250 |  6250 |  3125 | 
|------|-------|-------|-------|
{noformat}
(partitions on node3 have been cleared before start historical rebalance). 


Reproducer:
{code:java}
public class WalRebalanceOnCleanPartitionReproducer extends 
GridCommonAbstractTest {
    /** Block predicate. */
    private P2<ClusterNode, Message> blockPred;

    /** {@inheritDoc} */
    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
        IgniteConfiguration cfg = super.getConfiguration(gridName);

        cfg.setConsistentId(gridName);

        cfg.setRebalanceThreadPoolSize(1);

        CacheConfiguration ccfg1 = new CacheConfiguration(DEFAULT_CACHE_NAME)
            .setCacheMode(CacheMode.PARTITIONED)
            .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
            .setBackups(2)
            .setAffinity(new RendezvousAffinityFunction(false, 32))
            .setRebalanceMode(CacheRebalanceMode.ASYNC);

        cfg.setCacheConfiguration(ccfg1);

        TestRecordingCommunicationSpi commSpi = new 
TestRecordingCommunicationSpi();

        commSpi.blockMessages(blockPred);

        cfg.setCommunicationSpi(commSpi);

        DataStorageConfiguration dsCfg = new DataStorageConfiguration()
            .setConcurrencyLevel(Runtime.getRuntime().availableProcessors() * 4)
            .setCheckpointFrequency(5_000)
            .setWalMode(WALMode.LOG_ONLY)
            .setPageSize(1024)
            .setWalSegmentSize(8 * 1024 * 1024)
            .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
                .setName("dfltDataRegion")
                .setPersistenceEnabled(true)
                .setMaxSize(512 * 1024 * 1024)
            );

        cfg.setDataStorageConfiguration(dsCfg);

        return cfg;
    }

    /** {@inheritDoc} */
    @Override protected void beforeTestsStarted() throws Exception {
        stopAllGrids();

        cleanPersistenceDir();

        super.beforeTestsStarted();
    }

    /**
     *
     */
    @Test
    @WithSystemProperty(key = IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "0")
    public void testHistoricalRebalanceRestart() throws Exception {
        IgniteEx crd = startGrid(0);

        crd.cluster().state(ClusterState.ACTIVE);
        crd.cluster().baselineAutoAdjustEnabled(false);

        Ignite node1 = startGrid(1);
        Ignite node2 = startGrid(2);

        List<ClusterNode> blt = new 
ArrayList<>(crd.context().discovery().aliveServerNodes());

        crd.cluster().setBaselineTopology(blt);

        IgniteCache<Integer, String> cache0 = crd.cache(DEFAULT_CACHE_NAME);

        System.out.println(">>> load 100k entries");
        loadData(cache0, 0, 100_000);

        forceCheckpoint();

        System.out.println(">>> stop node 2");
        node2.close();

        awaitPartitionMapExchange();

        System.out.println(">>> load 100k entries again");
        loadData(cache0, 100_000, 100_000);

        blockPred = (node, msg) -> {
            if (msg instanceof GridDhtPartitionDemandMessage) {
                GridDhtPartitionDemandMessage msg0 = 
(GridDhtPartitionDemandMessage)msg;

                return msg0.groupId() == CU.cacheId(DEFAULT_CACHE_NAME);
            }

            return false;
        };

        startGrid(2);

        TestRecordingCommunicationSpi spi2 = 
TestRecordingCommunicationSpi.spi(grid(2));

        spi2.waitForBlocked();
        spi2.stopBlock();

        // Forces rebalanceing to restart without assign partition states.
        System.out.println(">>> stop grid 1");
        node1.close();

        spi2.blockMessages(blockPred);
        spi2.waitForBlocked();

        System.out.println(">>> start grid 1");
        startGrid(1);

        spi2.stopBlock();

        // just to be sure
        U.sleep(3_000);

        awaitPartitionMapExchange();

        verifyPartittionSizes();
    }

    /** */
    private void verifyPartittionSizes() {
        int grids = G.allGrids().size();

        SB buf = new SB();

        for (int p = 0; p < 32; p++) {
            Set<Long> sizesSet = new LinkedHashSet<>();
            List<GridDhtLocalPartition> parts = new ArrayList<>();

            for (int n = 0; n < grids; n++) {
                GridDhtLocalPartition part = 
grid(n).cachex(DEFAULT_CACHE_NAME).context().topology().localPartition(p);

                assert part != null;
                assert part.state() == GridDhtPartitionState.OWNING;

                sizesSet.add(part.fullSize());
                parts.add(part);
            }

            if (sizesSet.size() == 1)
                continue;

            buf.a(String.format("\n|  %2d  | ", p));

            for (GridDhtLocalPartition part : parts)
                buf.a(String.format(" %04d", part.fullSize())).a(" | ");
        }

        assertTrue("\n|------|-----------------------|" +
            "\n|      |     entries count     |" +
            "\n| part |-----------------------|" +
            "\n|      | node1 | node2 | node3 |" +
            "\n|------|-----------------------|" +
            buf +
            "\n|------|-------|-------|-------|", buf.length() == 0);
    }

    /** */
    private void loadData(IgniteCache cache, int off, int cnt) {
        try (IgniteDataStreamer<Integer, String> streamer = 
grid(0).dataStreamer(cache.getName())) {
            for (int i = off; i < off + cnt; i++)
                streamer.addData(i, String.valueOf(i));
        }
    }
}
{code}


> Historical (WAL) rebalance can start on a cleared partition if some baseline 
> node leaves the cluster and then joins back.
> -------------------------------------------------------------------------------------------------------------------------
>
>                 Key: IGNITE-12605
>                 URL: https://issues.apache.org/jira/browse/IGNITE-12605
>             Project: Ignite
>          Issue Type: Bug
>    Affects Versions: 2.9
>         Environment: 
>            Reporter: Pavel Pereslegin
>            Assignee: Pavel Pereslegin
>            Priority: Major
>         Attachments: WalRebalanceOnCleanPartitionReproducerUnstable.java
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> _(scenario: cluster with 3 nodes, node3 starts historical rebalancing and 
> then node2 leaves cluster)_
>  On partition map exchange initiated by baseline node leaving, the historical 
> supplier is not provided in the full message (assignPartitionStates() isn't 
> called on coordinator when a node leaves). Since we don't have a historical 
> supplier "historical" partition scheduled for clearing, then when a node 
> joins back assignPartitionStates() is called and we have a supplier for 
> historical rebalance, but partition may be cleared already.
>  After such rebalance we have inconsistent partitions on a "historically 
> rebalanced" node (with consistent partition counters and state).
> "Inlined" reproducer uses TestRecordingCommunicationSpi to sync nodes (but 
> this issue can be "unstable" reproduced without it (see attachment)).
>  
> Reproducer shows the following error.
> {noformat}
> java.lang.AssertionError: 
> |------|-----------------------|
> |      |     entries count     |
> | part |-----------------------|
> |      | node1 | node2 | node3 |
> |------|-----------------------|
> |   0  |  6250 |  6250 |  3125 | 
> |   1  |  6250 |  6250 |  3125 | 
> |   2  |  6250 |  6250 |  3125 | 
>   ... 
> |  31  |  6250 |  6250 |  3125 | 
> |------|-------|-------|-------|
> {noformat}
> (partitions on node3 have been cleared before start historical rebalance). 
>  
> Reproducer:
> {code:java}
> public class WalRebalanceOnCleanPartitionReproducer extends 
> GridCommonAbstractTest {
>     /** Block predicate. */
>     private P2<ClusterNode, Message> blockPred;
>     /** {@inheritDoc} */
>     @Override protected IgniteConfiguration getConfiguration(String gridName) 
> throws Exception {
>         IgniteConfiguration cfg = super.getConfiguration(gridName);
>         cfg.setConsistentId(gridName);
>         cfg.setRebalanceThreadPoolSize(1);
>         CacheConfiguration ccfg1 = new CacheConfiguration(DEFAULT_CACHE_NAME)
>             .setCacheMode(CacheMode.PARTITIONED)
>             .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
>             .setBackups(2)
>             .setAffinity(new RendezvousAffinityFunction(false, 32))
>             .setRebalanceMode(CacheRebalanceMode.ASYNC);
>         cfg.setCacheConfiguration(ccfg1);
>         TestRecordingCommunicationSpi commSpi = new 
> TestRecordingCommunicationSpi();
>         commSpi.blockMessages(blockPred);
>         cfg.setCommunicationSpi(commSpi);
>         DataStorageConfiguration dsCfg = new DataStorageConfiguration()
>             .setConcurrencyLevel(Runtime.getRuntime().availableProcessors() * 
> 4)
>             .setCheckpointFrequency(5_000)
>             .setWalMode(WALMode.LOG_ONLY)
>             .setPageSize(1024)
>             .setWalSegmentSize(8 * 1024 * 1024)
>             .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
>                 .setName("dfltDataRegion")
>                 .setPersistenceEnabled(true)
>                 .setMaxSize(512 * 1024 * 1024)
>             );
>         cfg.setDataStorageConfiguration(dsCfg);
>         return cfg;
>     }
>     /** {@inheritDoc} */
>     @Override protected void beforeTestsStarted() throws Exception {
>         stopAllGrids();
>         cleanPersistenceDir();
>         super.beforeTestsStarted();
>     }
>     /**
>      *
>      */
>     @Test
>     @WithSystemProperty(key = IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "0")
>     public void testHistoricalRebalanceRestart() throws Exception {
>         IgniteEx crd = startGrid(0);
>         crd.cluster().state(ClusterState.ACTIVE);
>         crd.cluster().baselineAutoAdjustEnabled(false);
>         Ignite node1 = startGrid(1);
>         Ignite node2 = startGrid(2);
>         List<ClusterNode> blt = new 
> ArrayList<>(crd.context().discovery().aliveServerNodes());
>         crd.cluster().setBaselineTopology(blt);
>         IgniteCache<Integer, String> cache0 = crd.cache(DEFAULT_CACHE_NAME);
>         System.out.println(">>> load 100k entries");
>         loadData(cache0, 0, 100_000);
>         forceCheckpoint();
>         System.out.println(">>> stop node 2");
>         node2.close();
>         awaitPartitionMapExchange();
>         System.out.println(">>> load 100k entries again");
>         loadData(cache0, 100_000, 100_000);
>         blockPred = (node, msg) -> {
>             if (msg instanceof GridDhtPartitionDemandMessage) {
>                 GridDhtPartitionDemandMessage msg0 = 
> (GridDhtPartitionDemandMessage)msg;
>                 return msg0.groupId() == CU.cacheId(DEFAULT_CACHE_NAME);
>             }
>             return false;
>         };
>         startGrid(2);
>         TestRecordingCommunicationSpi spi2 = 
> TestRecordingCommunicationSpi.spi(grid(2));
>         spi2.waitForBlocked();
>         spi2.stopBlock();
>         // Forces rebalanceing to restart without assign partition states.
>         System.out.println(">>> stop grid 1");
>         node1.close();
>         spi2.blockMessages(blockPred);
>         spi2.waitForBlocked();
>         System.out.println(">>> start grid 1");
>         startGrid(1);
>         spi2.stopBlock();
>         // just to be sure
>         U.sleep(3_000);
>         awaitPartitionMapExchange();
>         verifyPartittionSizes();
>     }
>     /** */
>     private void verifyPartittionSizes() {
>         int grids = G.allGrids().size();
>         SB buf = new SB();
>         for (int p = 0; p < 32; p++) {
>             Set<Long> sizesSet = new LinkedHashSet<>();
>             List<GridDhtLocalPartition> parts = new ArrayList<>();
>             for (int n = 0; n < grids; n++) {
>                 GridDhtLocalPartition part = 
> grid(n).cachex(DEFAULT_CACHE_NAME).context().topology().localPartition(p);
>                 assert part != null;
>                 assert part.state() == GridDhtPartitionState.OWNING;
>                 sizesSet.add(part.fullSize());
>                 parts.add(part);
>             }
>             if (sizesSet.size() == 1)
>                 continue;
>             buf.a(String.format("\n|  %2d  | ", p));
>             for (GridDhtLocalPartition part : parts)
>                 buf.a(String.format(" %04d", part.fullSize())).a(" | ");
>         }
>         assertTrue("\n|------|-----------------------|" +
>             "\n|      |     entries count     |" +
>             "\n| part |-----------------------|" +
>             "\n|      | node1 | node2 | node3 |" +
>             "\n|------|-----------------------|" +
>             buf +
>             "\n|------|-------|-------|-------|", buf.length() == 0);
>     }
>     /** */
>     private void loadData(IgniteCache cache, int off, int cnt) {
>         try (IgniteDataStreamer<Integer, String> streamer = 
> grid(0).dataStreamer(cache.getName())) {
>             for (int i = off; i < off + cnt; i++)
>                 streamer.addData(i, String.valueOf(i));
>         }
>     }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to