Alexei Scherbakov created IGNITE-9188: -----------------------------------------
Summary: Unexpected eviction leading to data lost in a scenario with stopping/restarting nodes during rebalancing Key: IGNITE-9188 URL: https://issues.apache.org/jira/browse/IGNITE-9188 Project: Ignite Issue Type: Bug Reporter: Alexei Scherbakov Assignee: Alexei Scherbakov Fix For: 2.7 Scenario: 1. Split grid nodes in two groups with distinct partition mapping. One group holds even partitions, other - odd. Rebalancing of even partitions is only triggered when number of nodes in grid exceeds n/2 threshold. 2. Start n/2 nodes, activate, put data into even partitions. 3. Start other n/2 nodes, change BLT, delay rebalancing of even partitions. 4. Stop newly started nodes before rebalancing is finished. Expected behavior: parttiions in "even" group will keep owning state. Actual behavior: even partitions are evicted leading to data loss. Unit test reproducer: {noformat} /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.ignite.internal.processors.cache.distributed; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import org.apache.ignite.Ignite; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.affinity.AffinityFunctionContext; import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.WALMode; import org.apache.ignite.internal.TestRecordingCommunicationSpi; import org.apache.ignite.internal.processors.cache.GridCacheUtils; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage; import org.apache.ignite.internal.util.typedef.G; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; /** * */ public class CacheLostPartitionsRestoreStateTest extends GridCommonAbstractTest { /** */ public static final long MB = 1024 * 1024L; /** */ public static final String GRP_ATTR = "grp"; /** */ public static final int GRIDS_CNT = 6; /** */ public static final String CACHE_1 = "filled"; /** */ public static final String CACHE_2 = "empty"; /** */ public static final String EVEN_GRP = "event"; /** */ public static final String ODD_GRP = "odd"; /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); cfg.setCommunicationSpi(new TestRecordingCommunicationSpi()); CacheConfiguration ccfg = new CacheConfiguration("default"); ccfg.setAffinity(new RendezvousAffinityFunction(false, CacheConfiguration.MAX_PARTITIONS_COUNT)); cfg.setCacheConfiguration(ccfg); cfg.setPeerClassLoadingEnabled(true); Map<String, Object> attrs = new HashMap<>(); attrs.put(GRP_ATTR, grp(getTestIgniteInstanceIndex(igniteInstanceName))); cfg.setUserAttributes(attrs); DataStorageConfiguration memCfg = new DataStorageConfiguration() .setDefaultDataRegionConfiguration( new DataRegionConfiguration().setPersistenceEnabled(true).setInitialSize(50 * MB).setMaxSize(50 * MB)) .setWalMode(WALMode.LOG_ONLY); cfg.setDataStorageConfiguration(memCfg); cfg.setCacheConfiguration(configuration(CACHE_1), configuration(CACHE_2)); return cfg; } /** * @param name Name. */ private CacheConfiguration configuration(String name) { return new CacheConfiguration(name). setCacheMode(CacheMode.PARTITIONED). setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL). setBackups(2). setRebalanceBatchSize(1). setAffinity(new TestAffinityFunction().setPartitions(32)); } /** * @param idx Index. */ private String grp(int idx) { return idx < GRIDS_CNT / 2 ? EVEN_GRP : ODD_GRP; } /** * @throws Exception if failed. */ public void test() throws Exception { try { Ignite ignite = startGridsMultiThreaded(GRIDS_CNT / 2, false); ignite.cluster().active(true); awaitPartitionMapExchange(); int blockPartId = 1; int c = 0; for (int i = 0; i < 1000; i++) { if (ignite.affinity(CACHE_1).partition(i) == blockPartId) { ignite.cache(CACHE_1).put(i, i); c++; } } assertEquals(c, ignite.cache(CACHE_1).size()); startGridsMultiThreaded(GRIDS_CNT / 2, GRIDS_CNT / 2); // Prevent rebalancing to new nodes. for (Ignite ig0 : G.allGrids()) { TestRecordingCommunicationSpi.spi(ig0).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { @Override public boolean apply(ClusterNode node, Message message) { if (message instanceof GridDhtPartitionDemandMessage) { assertTrue(node.order() <= GRIDS_CNT / 2); GridDhtPartitionDemandMessage msg = (GridDhtPartitionDemandMessage)message; return msg.groupId() == CU.cacheId(CACHE_1) || msg.groupId() == CU.cacheId(CACHE_2); } return false; } }); } ignite.cluster().setBaselineTopology(GRIDS_CNT); for (Ignite ig0 : G.allGrids()) { if (ig0.cluster().localNode().order() <= GRIDS_CNT / 2) continue; TestRecordingCommunicationSpi.spi(ig0).waitForBlocked(); } assertEquals(c, ignite.cache(CACHE_1).size()); assertEquals(c, ignite.cache(CACHE_1).size()); int i = 0; while(i < GRIDS_CNT / 2) { stopGrid(GRIDS_CNT / 2 + i); i++; } awaitPartitionMapExchange(); for (Ignite ig : G.allGrids()) { GridDhtLocalPartition locPart = dht(ig.cache(CACHE_1)).topology().localPartition(blockPartId); assertNotNull(locPart); assertTrue(locPart.state() == OWNING); } } finally { stopAllGrids(); } } /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { cleanPersistenceDir(); } /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { cleanPersistenceDir(); } /** */ public static class TestAffinityFunction extends RendezvousAffinityFunction { /** */ public TestAffinityFunction() { } /** */ public TestAffinityFunction(boolean exclNeighbors) { super(exclNeighbors); } /** */ public TestAffinityFunction(boolean exclNeighbors, int parts) { super(exclNeighbors, parts); } /** */ public TestAffinityFunction(int parts, @Nullable IgniteBiPredicate<ClusterNode, ClusterNode> backupFilter) { super(parts, backupFilter); } /** {@inheritDoc} */ @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) { int parts = partitions(); List<List<ClusterNode>> assignments = new ArrayList<>(parts); Map<UUID, Collection<ClusterNode>> neighborhoodCache = isExcludeNeighbors() ? GridCacheUtils.neighbors(affCtx.currentTopologySnapshot()) : null; List<ClusterNode> nodes = affCtx.currentTopologySnapshot(); Map<Object, List<ClusterNode>> nodesByGrp = U.newHashMap(2); for (ClusterNode node : nodes) { Object grp = node.attribute(GRP_ATTR); List<ClusterNode> grpNodes = nodesByGrp.get(grp); if (grpNodes == null) nodesByGrp.put(grp, (grpNodes = new ArrayList<>())); grpNodes.add(node); } boolean split = nodesByGrp.size() == 2; for (int i = 0; i < parts; i++) { List<ClusterNode> partAssignment = assignPartition(i, split ? nodesByGrp.get(i % 2 == 0 ? EVEN_GRP : ODD_GRP) : nodes, affCtx.backups(), neighborhoodCache); assignments.add(partAssignment); } return assignments; } } } {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)