[ 
https://issues.apache.org/jira/browse/IGNITE-12605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pavel Pereslegin updated IGNITE-12605:
--------------------------------------
    Description: 
(_cluster with 3 nodes and node3 starts historical rebalancing when node2 
leaves cluster_)
On partition map exchange initiated by baseline node leaving, the historical 
supplier is not provided in the full message (assignPartitionStates() isn't 
called on coordinator when a node leaves). Since we don't have a historical 
supplier "historical" partition scheduled for clearing, then when a node joins 
back assignPartitionStates() is called and we have a supplier for historical 
rebalance, but partition may be cleared already.
 After such rebalance we have inconsistent partitions on a "historically 
rebalanced" node (with consistent partition counters and state).

"Inlined" reproducer uses TestRecordingCommunicationSpi to sync nodes (but this 
issue can be "unstable" reproduced without it (see attachment)).

Reproducer shows the following errors.

Error 1 (partitions on node3 have been cleared before start historical 
rebalance).
{noformat}
java.lang.AssertionError: 
|------|-----------------------|
|      |     entries count     |
| part |-----------------------|
|      | node1 | node2 | node3 |
|------|-----------------------|
|   0  |  6250 |  6250 |  3125 | 
|   1  |  6250 |  6250 |  3125 | 
|   2  |  6250 |  6250 |  3125 | 
  ... 
|  31  |  6250 |  6250 |  3125 | 
|------|-------|-------|-------|
{noformat}
Error 2 (should be investigated deeply).
{noformat}
java.lang.AssertionError: Reached end of WAL but not all partitions are done
        at 
org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager$WALHistoricalIterator.advance(GridCacheOffheapManager.java:1419)
        at 
org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager$WALHistoricalIterator.next(GridCacheOffheapManager.java:1295)
        at 
org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager$WALHistoricalIterator.nextX(GridCacheOffheapManager.java:1255)
        at 
org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager$WALHistoricalIterator.nextX(GridCacheOffheapManager.java:1163)
        at 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteRebalanceIteratorImpl.nextX(IgniteRebalanceIteratorImpl.java:135)
        at 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteRebalanceIteratorImpl.next(IgniteRebalanceIteratorImpl.java:215)
        at 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteRebalanceIteratorImpl.peek(IgniteRebalanceIteratorImpl.java:155)
        at 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplier.handleDemandMessage(GridDhtPartitionSupplier.java:316)
        at 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader.lambda$handleDemandMessage$1(GridDhtPreloader.java:374)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
{noformat}
 

Reproducer:
{code:java}
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.P2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;

import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;

/** */
public class WalRebalanceOnCleanPartitionReproducer extends 
GridCommonAbstractTest {
    /** Block predicate. */
    private P2<ClusterNode, Message> blockPred;

    /** {@inheritDoc} */
    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
        IgniteConfiguration cfg = super.getConfiguration(gridName);

        cfg.setConsistentId(gridName);

        cfg.setRebalanceThreadPoolSize(1);

        CacheConfiguration ccfg1 = new CacheConfiguration(DEFAULT_CACHE_NAME)
            .setCacheMode(CacheMode.PARTITIONED)
            .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
            .setBackups(2)
            .setAffinity(new RendezvousAffinityFunction(false, 32))
            .setRebalanceMode(CacheRebalanceMode.ASYNC);

        cfg.setCacheConfiguration(ccfg1);

        TestRecordingCommunicationSpi commSpi = new 
TestRecordingCommunicationSpi();

        commSpi.blockMessages(blockPred);

        cfg.setCommunicationSpi(commSpi);

        DataStorageConfiguration dsCfg = new DataStorageConfiguration()
            .setConcurrencyLevel(Runtime.getRuntime().availableProcessors() * 4)
            .setCheckpointFrequency(5_000)
            .setWalMode(WALMode.LOG_ONLY)
            .setPageSize(1024)
            .setWalSegmentSize(8 * 1024 * 1024)
            .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
                .setName("dfltDataRegion")
                .setPersistenceEnabled(true)
                .setMaxSize(512 * 1024 * 1024)
            );

        cfg.setDataStorageConfiguration(dsCfg);

        return cfg;
    }

    /** {@inheritDoc} */
    @Override protected void beforeTestsStarted() throws Exception {
        stopAllGrids();

        cleanPersistenceDir();

        super.beforeTestsStarted();
    }

    /**
     *
     */
    @Test
    @WithSystemProperty(key = IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "0")
    public void testHistoricalRebalanceNotStartsAfterNodeLeft() throws 
Exception {
        IgniteEx crd = startGrid(0);

        crd.cluster().state(ClusterState.ACTIVE);
        crd.cluster().baselineAutoAdjustEnabled(false);

        Ignite node1 = startGrid(1);
        Ignite node2 = startGrid(2);

        List<ClusterNode> blt = new 
ArrayList<>(crd.context().discovery().aliveServerNodes());

        crd.cluster().setBaselineTopology(blt);

        IgniteCache<Integer, String> cache0 = crd.cache(DEFAULT_CACHE_NAME);

        System.out.println(">>> load 100k entries");
        loadData(cache0, 0, 100_000);

        forceCheckpoint();

        System.out.println(">>> stop node 2");
        node2.close();

        awaitPartitionMapExchange();

        System.out.println(">>> load 100k entries again");
        loadData(cache0, 100_000, 100_000);

        CountDownLatch startLatch = new CountDownLatch(1);

        blockPred = (node, msg) -> {
            if (msg instanceof GridDhtPartitionDemandMessage) {
                GridDhtPartitionDemandMessage msg0 = 
(GridDhtPartitionDemandMessage)msg;

                return msg0.groupId() == CU.cacheId(DEFAULT_CACHE_NAME);
            }

            return false;
        };

        GridTestUtils.runAsync(() -> {
            System.out.println(">>> start grid 2");
            startGrid(2);

            startLatch.countDown();

            return null;
        });

        startLatch.await();

        TestRecordingCommunicationSpi spi2 = 
TestRecordingCommunicationSpi.spi(grid(2));

        spi2.waitForBlocked();
        spi2.stopBlock();

        // Forces rebalanceing to restart without assign partition states.
        System.out.println(">>> stop grid 1");
        node1.close();

        spi2.blockMessages(blockPred);
        spi2.waitForBlocked();

        System.out.println(">>> start grid 1");
        startGrid(1);

        spi2.stopBlock();

        // just to be sure
        U.sleep(3_000);

        awaitPartitionMapExchange();

        verifyPartittionSizes();
    }

    /** */
    private void verifyPartittionSizes() {
        int grids = G.allGrids().size();

        SB buf = new SB();

        for (int p = 0; p < 32; p++) {
            Set<Long> sizesSet = new LinkedHashSet<>();
            List<GridDhtLocalPartition> parts = new ArrayList<>();

            for (int n = 0; n < grids; n++) {
                GridDhtLocalPartition part = 
grid(n).cachex(DEFAULT_CACHE_NAME).context().topology().localPartition(p);

                assert part != null;
                assert part.state() == GridDhtPartitionState.OWNING;

                sizesSet.add(part.fullSize());
                parts.add(part);
            }

            if (sizesSet.size() == 1)
                continue;

            buf.a(String.format("\n|  %2d  | ", p));

            for (GridDhtLocalPartition part : parts)
                buf.a(String.format(" %04d", part.fullSize())).a(" | ");
        }

        assertTrue("\n|------|-----------------------|" +
            "\n|      |     entries count     |" +
            "\n| part |-----------------------|" +
            "\n|      | node1 | node2 | node3 |" +
            "\n|------|-----------------------|" +
            buf +
            "\n|------|-------|-------|-------|", buf.length() == 0);
    }

    /** */
    private void loadData(IgniteCache cache, int off, int cnt) {
        try (IgniteDataStreamer<Integer, String> streamer = 
grid(0).dataStreamer(cache.getName())) {
            for (int i = off; i < off + cnt; i++)
                streamer.addData(i, String.valueOf(i));
        }
    }
}
{code}

  was:
On partition map exchange initiated by baseline node leaving, the historical 
supplier is not provided in the full message (assignPartitionStates() isn't 
called when a node leaves).

Since we don't have a historical supplier "historical" partition scheduled for 
clearing, then when a node joins back assignPartitionStates() is called and we 
have a supplier for historical rebalance, but partition may be cleared already.
 After such rebalance we have inconsistent partitions on a "historically 
rebalanced" node (with consistent partition counters and state).

"Inlined" reproducer uses TestRecordingCommunicationSpi to sync nodes (but this 
issue can be "unstable" reproduced without it (see attachment)).

Reproducer shows the following errors.

Error 1 (partitions have been cleared).
{noformat}
java.lang.AssertionError: 
|------|-----------------------|
|      |     entries count     |
| part |-----------------------|
|      | node1 | node2 | node3 |
|------|-----------------------|
|   0  |  6250 |  6250 |  3125 | 
|   1  |  6250 |  6250 |  3125 | 
|   2  |  6250 |  6250 |  3125 | 
  ... 
|  31  |  6250 |  6250 |  3125 | 
|------|-------|-------|-------|
{noformat}
Error 2 (should be investigated deeply).
{noformat}
java.lang.AssertionError: Reached end of WAL but not all partitions are done
        at 
org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager$WALHistoricalIterator.advance(GridCacheOffheapManager.java:1419)
        at 
org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager$WALHistoricalIterator.next(GridCacheOffheapManager.java:1295)
        at 
org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager$WALHistoricalIterator.nextX(GridCacheOffheapManager.java:1255)
        at 
org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager$WALHistoricalIterator.nextX(GridCacheOffheapManager.java:1163)
        at 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteRebalanceIteratorImpl.nextX(IgniteRebalanceIteratorImpl.java:135)
        at 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteRebalanceIteratorImpl.next(IgniteRebalanceIteratorImpl.java:215)
        at 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteRebalanceIteratorImpl.peek(IgniteRebalanceIteratorImpl.java:155)
        at 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplier.handleDemandMessage(GridDhtPartitionSupplier.java:316)
        at 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader.lambda$handleDemandMessage$1(GridDhtPreloader.java:374)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
{noformat}
 

Reproducer:
{code:java}
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import 
org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.P2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;

import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;

/** */
public class WalRebalanceOnCleanPartitionReproducer extends 
GridCommonAbstractTest {
    /** Block predicate. */
    private P2<ClusterNode, Message> blockPred;

    /** {@inheritDoc} */
    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
        IgniteConfiguration cfg = super.getConfiguration(gridName);

        cfg.setConsistentId(gridName);

        cfg.setRebalanceThreadPoolSize(1);

        CacheConfiguration ccfg1 = new CacheConfiguration(DEFAULT_CACHE_NAME)
            .setCacheMode(CacheMode.PARTITIONED)
            .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
            .setBackups(2)
            .setAffinity(new RendezvousAffinityFunction(false, 32))
            .setRebalanceMode(CacheRebalanceMode.ASYNC);

        cfg.setCacheConfiguration(ccfg1);

        TestRecordingCommunicationSpi commSpi = new 
TestRecordingCommunicationSpi();

        commSpi.blockMessages(blockPred);

        cfg.setCommunicationSpi(commSpi);

        DataStorageConfiguration dsCfg = new DataStorageConfiguration()
            .setConcurrencyLevel(Runtime.getRuntime().availableProcessors() * 4)
            .setCheckpointFrequency(5_000)
            .setWalMode(WALMode.LOG_ONLY)
            .setPageSize(1024)
            .setWalSegmentSize(8 * 1024 * 1024)
            .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
                .setName("dfltDataRegion")
                .setPersistenceEnabled(true)
                .setMaxSize(512 * 1024 * 1024)
            );

        cfg.setDataStorageConfiguration(dsCfg);

        return cfg;
    }

    /** {@inheritDoc} */
    @Override protected void beforeTestsStarted() throws Exception {
        stopAllGrids();

        cleanPersistenceDir();

        super.beforeTestsStarted();
    }

    /**
     *
     */
    @Test
    @WithSystemProperty(key = IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "0")
    public void testHistoricalRebalanceNotStartsAfterNodeLeft() throws 
Exception {
        IgniteEx crd = startGrid(0);

        crd.cluster().state(ClusterState.ACTIVE);
        crd.cluster().baselineAutoAdjustEnabled(false);

        Ignite node1 = startGrid(1);
        Ignite node2 = startGrid(2);

        List<ClusterNode> blt = new 
ArrayList<>(crd.context().discovery().aliveServerNodes());

        crd.cluster().setBaselineTopology(blt);

        IgniteCache<Integer, String> cache0 = crd.cache(DEFAULT_CACHE_NAME);

        System.out.println(">>> load 100k entries");
        loadData(cache0, 0, 100_000);

        forceCheckpoint();

        System.out.println(">>> stop node 2");
        node2.close();

        awaitPartitionMapExchange();

        System.out.println(">>> load 100k entries again");
        loadData(cache0, 100_000, 100_000);

        CountDownLatch startLatch = new CountDownLatch(1);

        blockPred = (node, msg) -> {
            if (msg instanceof GridDhtPartitionDemandMessage) {
                GridDhtPartitionDemandMessage msg0 = 
(GridDhtPartitionDemandMessage)msg;

                return msg0.groupId() == CU.cacheId(DEFAULT_CACHE_NAME);
            }

            return false;
        };

        GridTestUtils.runAsync(() -> {
            System.out.println(">>> start grid 2");
            startGrid(2);

            startLatch.countDown();

            return null;
        });

        startLatch.await();

        TestRecordingCommunicationSpi spi2 = 
TestRecordingCommunicationSpi.spi(grid(2));

        spi2.waitForBlocked();
        spi2.stopBlock();

        // Forces rebalanceing to restart without assign partition states.
        System.out.println(">>> stop grid 1");
        node1.close();

        spi2.blockMessages(blockPred);
        spi2.waitForBlocked();

        System.out.println(">>> start grid 1");
        startGrid(1);

        spi2.stopBlock();

        // just to be sure
        U.sleep(3_000);

        awaitPartitionMapExchange();

        verifyPartittionSizes();
    }

    /** */
    private void verifyPartittionSizes() {
        int grids = G.allGrids().size();

        SB buf = new SB();

        for (int p = 0; p < 32; p++) {
            Set<Long> sizesSet = new LinkedHashSet<>();
            List<GridDhtLocalPartition> parts = new ArrayList<>();

            for (int n = 0; n < grids; n++) {
                GridDhtLocalPartition part = 
grid(n).cachex(DEFAULT_CACHE_NAME).context().topology().localPartition(p);

                assert part != null;
                assert part.state() == GridDhtPartitionState.OWNING;

                sizesSet.add(part.fullSize());
                parts.add(part);
            }

            if (sizesSet.size() == 1)
                continue;

            buf.a(String.format("\n|  %2d  | ", p));

            for (GridDhtLocalPartition part : parts)
                buf.a(String.format(" %04d", part.fullSize())).a(" | ");
        }

        assertTrue("\n|------|-----------------------|" +
            "\n|      |     entries count     |" +
            "\n| part |-----------------------|" +
            "\n|      | node1 | node2 | node3 |" +
            "\n|------|-----------------------|" +
            buf +
            "\n|------|-------|-------|-------|", buf.length() == 0);
    }

    /** */
    private void loadData(IgniteCache cache, int off, int cnt) {
        try (IgniteDataStreamer<Integer, String> streamer = 
grid(0).dataStreamer(cache.getName())) {
            for (int i = off; i < off + cnt; i++)
                streamer.addData(i, String.valueOf(i));
        }
    }
}
{code}


> Historical (WAL) rebalance can start on a cleared partition if some baseline 
> node leaves the cluster and then joins back.
> -------------------------------------------------------------------------------------------------------------------------
>
>                 Key: IGNITE-12605
>                 URL: https://issues.apache.org/jira/browse/IGNITE-12605
>             Project: Ignite
>          Issue Type: Bug
>    Affects Versions: 2.9
>         Environment: 
>            Reporter: Pavel Pereslegin
>            Assignee: Pavel Pereslegin
>            Priority: Major
>         Attachments: WalRebalanceOnCleanPartitionReproducerUnstable.java
>
>
> (_cluster with 3 nodes and node3 starts historical rebalancing when node2 
> leaves cluster_)
> On partition map exchange initiated by baseline node leaving, the historical 
> supplier is not provided in the full message (assignPartitionStates() isn't 
> called on coordinator when a node leaves). Since we don't have a historical 
> supplier "historical" partition scheduled for clearing, then when a node 
> joins back assignPartitionStates() is called and we have a supplier for 
> historical rebalance, but partition may be cleared already.
>  After such rebalance we have inconsistent partitions on a "historically 
> rebalanced" node (with consistent partition counters and state).
> "Inlined" reproducer uses TestRecordingCommunicationSpi to sync nodes (but 
> this issue can be "unstable" reproduced without it (see attachment)).
> Reproducer shows the following errors.
> Error 1 (partitions on node3 have been cleared before start historical 
> rebalance).
> {noformat}
> java.lang.AssertionError: 
> |------|-----------------------|
> |      |     entries count     |
> | part |-----------------------|
> |      | node1 | node2 | node3 |
> |------|-----------------------|
> |   0  |  6250 |  6250 |  3125 | 
> |   1  |  6250 |  6250 |  3125 | 
> |   2  |  6250 |  6250 |  3125 | 
>   ... 
> |  31  |  6250 |  6250 |  3125 | 
> |------|-------|-------|-------|
> {noformat}
> Error 2 (should be investigated deeply).
> {noformat}
> java.lang.AssertionError: Reached end of WAL but not all partitions are done
>       at 
> org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager$WALHistoricalIterator.advance(GridCacheOffheapManager.java:1419)
>       at 
> org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager$WALHistoricalIterator.next(GridCacheOffheapManager.java:1295)
>       at 
> org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager$WALHistoricalIterator.nextX(GridCacheOffheapManager.java:1255)
>       at 
> org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager$WALHistoricalIterator.nextX(GridCacheOffheapManager.java:1163)
>       at 
> org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteRebalanceIteratorImpl.nextX(IgniteRebalanceIteratorImpl.java:135)
>       at 
> org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteRebalanceIteratorImpl.next(IgniteRebalanceIteratorImpl.java:215)
>       at 
> org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteRebalanceIteratorImpl.peek(IgniteRebalanceIteratorImpl.java:155)
>       at 
> org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplier.handleDemandMessage(GridDhtPartitionSupplier.java:316)
>       at 
> org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader.lambda$handleDemandMessage$1(GridDhtPreloader.java:374)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> {noformat}
>  
> Reproducer:
> {code:java}
> import java.util.ArrayList;
> import java.util.LinkedHashSet;
> import java.util.List;
> import java.util.Set;
> import java.util.concurrent.CountDownLatch;
> import org.apache.ignite.Ignite;
> import org.apache.ignite.IgniteCache;
> import org.apache.ignite.IgniteDataStreamer;
> import org.apache.ignite.cache.CacheAtomicityMode;
> import org.apache.ignite.cache.CacheMode;
> import org.apache.ignite.cache.CacheRebalanceMode;
> import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
> import org.apache.ignite.cluster.ClusterNode;
> import org.apache.ignite.cluster.ClusterState;
> import org.apache.ignite.configuration.CacheConfiguration;
> import org.apache.ignite.configuration.DataRegionConfiguration;
> import org.apache.ignite.configuration.DataStorageConfiguration;
> import org.apache.ignite.configuration.IgniteConfiguration;
> import org.apache.ignite.configuration.WALMode;
> import org.apache.ignite.internal.IgniteEx;
> import org.apache.ignite.internal.TestRecordingCommunicationSpi;
> import 
> org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
> import 
> org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
> import 
> org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
> import org.apache.ignite.internal.util.typedef.G;
> import org.apache.ignite.internal.util.typedef.P2;
> import org.apache.ignite.internal.util.typedef.internal.CU;
> import org.apache.ignite.internal.util.typedef.internal.SB;
> import org.apache.ignite.internal.util.typedef.internal.U;
> import org.apache.ignite.plugin.extensions.communication.Message;
> import org.apache.ignite.testframework.GridTestUtils;
> import org.apache.ignite.testframework.junits.WithSystemProperty;
> import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
> import org.junit.Test;
> import static 
> org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
> /** */
> public class WalRebalanceOnCleanPartitionReproducer extends 
> GridCommonAbstractTest {
>     /** Block predicate. */
>     private P2<ClusterNode, Message> blockPred;
>     /** {@inheritDoc} */
>     @Override protected IgniteConfiguration getConfiguration(String gridName) 
> throws Exception {
>         IgniteConfiguration cfg = super.getConfiguration(gridName);
>         cfg.setConsistentId(gridName);
>         cfg.setRebalanceThreadPoolSize(1);
>         CacheConfiguration ccfg1 = new CacheConfiguration(DEFAULT_CACHE_NAME)
>             .setCacheMode(CacheMode.PARTITIONED)
>             .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
>             .setBackups(2)
>             .setAffinity(new RendezvousAffinityFunction(false, 32))
>             .setRebalanceMode(CacheRebalanceMode.ASYNC);
>         cfg.setCacheConfiguration(ccfg1);
>         TestRecordingCommunicationSpi commSpi = new 
> TestRecordingCommunicationSpi();
>         commSpi.blockMessages(blockPred);
>         cfg.setCommunicationSpi(commSpi);
>         DataStorageConfiguration dsCfg = new DataStorageConfiguration()
>             .setConcurrencyLevel(Runtime.getRuntime().availableProcessors() * 
> 4)
>             .setCheckpointFrequency(5_000)
>             .setWalMode(WALMode.LOG_ONLY)
>             .setPageSize(1024)
>             .setWalSegmentSize(8 * 1024 * 1024)
>             .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
>                 .setName("dfltDataRegion")
>                 .setPersistenceEnabled(true)
>                 .setMaxSize(512 * 1024 * 1024)
>             );
>         cfg.setDataStorageConfiguration(dsCfg);
>         return cfg;
>     }
>     /** {@inheritDoc} */
>     @Override protected void beforeTestsStarted() throws Exception {
>         stopAllGrids();
>         cleanPersistenceDir();
>         super.beforeTestsStarted();
>     }
>     /**
>      *
>      */
>     @Test
>     @WithSystemProperty(key = IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "0")
>     public void testHistoricalRebalanceNotStartsAfterNodeLeft() throws 
> Exception {
>         IgniteEx crd = startGrid(0);
>         crd.cluster().state(ClusterState.ACTIVE);
>         crd.cluster().baselineAutoAdjustEnabled(false);
>         Ignite node1 = startGrid(1);
>         Ignite node2 = startGrid(2);
>         List<ClusterNode> blt = new 
> ArrayList<>(crd.context().discovery().aliveServerNodes());
>         crd.cluster().setBaselineTopology(blt);
>         IgniteCache<Integer, String> cache0 = crd.cache(DEFAULT_CACHE_NAME);
>         System.out.println(">>> load 100k entries");
>         loadData(cache0, 0, 100_000);
>         forceCheckpoint();
>         System.out.println(">>> stop node 2");
>         node2.close();
>         awaitPartitionMapExchange();
>         System.out.println(">>> load 100k entries again");
>         loadData(cache0, 100_000, 100_000);
>         CountDownLatch startLatch = new CountDownLatch(1);
>         blockPred = (node, msg) -> {
>             if (msg instanceof GridDhtPartitionDemandMessage) {
>                 GridDhtPartitionDemandMessage msg0 = 
> (GridDhtPartitionDemandMessage)msg;
>                 return msg0.groupId() == CU.cacheId(DEFAULT_CACHE_NAME);
>             }
>             return false;
>         };
>         GridTestUtils.runAsync(() -> {
>             System.out.println(">>> start grid 2");
>             startGrid(2);
>             startLatch.countDown();
>             return null;
>         });
>         startLatch.await();
>         TestRecordingCommunicationSpi spi2 = 
> TestRecordingCommunicationSpi.spi(grid(2));
>         spi2.waitForBlocked();
>         spi2.stopBlock();
>         // Forces rebalanceing to restart without assign partition states.
>         System.out.println(">>> stop grid 1");
>         node1.close();
>         spi2.blockMessages(blockPred);
>         spi2.waitForBlocked();
>         System.out.println(">>> start grid 1");
>         startGrid(1);
>         spi2.stopBlock();
>         // just to be sure
>         U.sleep(3_000);
>         awaitPartitionMapExchange();
>         verifyPartittionSizes();
>     }
>     /** */
>     private void verifyPartittionSizes() {
>         int grids = G.allGrids().size();
>         SB buf = new SB();
>         for (int p = 0; p < 32; p++) {
>             Set<Long> sizesSet = new LinkedHashSet<>();
>             List<GridDhtLocalPartition> parts = new ArrayList<>();
>             for (int n = 0; n < grids; n++) {
>                 GridDhtLocalPartition part = 
> grid(n).cachex(DEFAULT_CACHE_NAME).context().topology().localPartition(p);
>                 assert part != null;
>                 assert part.state() == GridDhtPartitionState.OWNING;
>                 sizesSet.add(part.fullSize());
>                 parts.add(part);
>             }
>             if (sizesSet.size() == 1)
>                 continue;
>             buf.a(String.format("\n|  %2d  | ", p));
>             for (GridDhtLocalPartition part : parts)
>                 buf.a(String.format(" %04d", part.fullSize())).a(" | ");
>         }
>         assertTrue("\n|------|-----------------------|" +
>             "\n|      |     entries count     |" +
>             "\n| part |-----------------------|" +
>             "\n|      | node1 | node2 | node3 |" +
>             "\n|------|-----------------------|" +
>             buf +
>             "\n|------|-------|-------|-------|", buf.length() == 0);
>     }
>     /** */
>     private void loadData(IgniteCache cache, int off, int cnt) {
>         try (IgniteDataStreamer<Integer, String> streamer = 
> grid(0).dataStreamer(cache.getName())) {
>             for (int i = off; i < off + cnt; i++)
>                 streamer.addData(i, String.valueOf(i));
>         }
>     }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to