Repository: ignite Updated Branches: refs/heads/master 2bb773a8c -> 5e6e4e5f7
IGNITE-8476 AssertionError exception occurs when trying to remove node from baseline under loading Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5e6e4e5f Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5e6e4e5f Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5e6e4e5f Branch: refs/heads/master Commit: 5e6e4e5f7c6d24c81cf676623ef4d5550a554f04 Parents: 2bb773a Author: Ivan Rakov <[email protected]> Authored: Thu May 31 16:56:14 2018 +0300 Committer: Ivan Rakov <[email protected]> Committed: Thu May 31 16:56:14 2018 +0300 ---------------------------------------------------------------------- .../apache/ignite/internal/IgniteKernal.java | 18 +- .../ignite/internal/IgniteNodeAttributes.java | 3 + .../processors/cache/CacheGroupContext.java | 13 +- .../processors/cache/ClusterCachesInfo.java | 58 +- .../processors/cache/GridCacheProcessor.java | 1 + ...lientAffinityAssignmentWithBaselineTest.java | 974 +++++++++++++++++++ .../ignite/testsuites/IgnitePdsTestSuite2.java | 2 + 7 files changed, 1059 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5e6e4e5f/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index c16af70..0171c31 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -105,9 +105,6 @@ import org.apache.ignite.internal.binary.BinaryMarshaller; import org.apache.ignite.internal.binary.BinaryUtils; import org.apache.ignite.internal.cluster.ClusterGroupAdapter; import org.apache.ignite.internal.cluster.IgniteClusterEx; -import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState; -import org.apache.ignite.internal.processors.failure.FailureProcessor; import org.apache.ignite.internal.managers.GridManager; import org.apache.ignite.internal.managers.checkpoint.GridCheckpointManager; import org.apache.ignite.internal.managers.collision.GridCollisionManager; @@ -121,6 +118,7 @@ import org.apache.ignite.internal.managers.indexing.GridIndexingManager; import org.apache.ignite.internal.managers.loadbalancer.GridLoadBalancerManager; import org.apache.ignite.internal.marshaller.optimized.OptimizedMarshaller; import org.apache.ignite.internal.processors.GridProcessor; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor; import org.apache.ignite.internal.processors.authentication.IgniteAuthenticationProcessor; import org.apache.ignite.internal.processors.cache.CacheConfigurationOverride; @@ -136,11 +134,13 @@ import org.apache.ignite.internal.processors.cache.persistence.filename.PdsConsi import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; import org.apache.ignite.internal.processors.closure.GridClosureProcessor; import org.apache.ignite.internal.processors.cluster.ClusterProcessor; +import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState; import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor; import org.apache.ignite.internal.processors.cluster.IGridClusterStateProcessor; import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor; import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor; import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor; +import org.apache.ignite.internal.processors.failure.FailureProcessor; import org.apache.ignite.internal.processors.hadoop.Hadoop; import org.apache.ignite.internal.processors.hadoop.HadoopProcessorAdapter; import org.apache.ignite.internal.processors.job.GridJobProcessor; @@ -201,10 +201,10 @@ import org.apache.ignite.marshaller.jdk.JdkMarshaller; import org.apache.ignite.mxbean.ClusterMetricsMXBean; import org.apache.ignite.mxbean.IgniteMXBean; import org.apache.ignite.mxbean.StripedExecutorMXBean; -import org.apache.ignite.mxbean.WorkersControlMXBean; import org.apache.ignite.mxbean.ThreadPoolMXBean; -import org.apache.ignite.mxbean.TransactionsMXBean; import org.apache.ignite.mxbean.TransactionMetricsMxBean; +import org.apache.ignite.mxbean.TransactionsMXBean; +import org.apache.ignite.mxbean.WorkersControlMXBean; import org.apache.ignite.plugin.IgnitePlugin; import org.apache.ignite.plugin.PluginNotFoundException; import org.apache.ignite.plugin.PluginProvider; @@ -1583,7 +1583,13 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { // Stick in network context into attributes. add(ATTR_IPS, (ips.isEmpty() ? "" : ips)); - add(ATTR_MACS, (macs.isEmpty() ? "" : macs)); + + Map<String, ?> userAttrs = configuration().getUserAttributes(); + + if (userAttrs != null && userAttrs.get(IgniteNodeAttributes.ATTR_MACS_OVERRIDE) != null) + add(ATTR_MACS, (Serializable)userAttrs.get(IgniteNodeAttributes.ATTR_MACS_OVERRIDE)); + else + add(ATTR_MACS, (macs.isEmpty() ? "" : macs)); // Stick in some system level attributes add(ATTR_JIT_NAME, U.getCompilerMx() == null ? "" : U.getCompilerMx().getName()); http://git-wip-us.apache.org/repos/asf/ignite/blob/5e6e4e5f/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java index 073369f..6a4beeb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteNodeAttributes.java @@ -120,6 +120,9 @@ public final class IgniteNodeAttributes { /** Internal attribute name constant. */ public static final String ATTR_MACS = ATTR_PREFIX + ".macs"; + /** Allows to override {@link #ATTR_MACS} by adding this attribute in the user attributes. */ + public static final String ATTR_MACS_OVERRIDE = "override." + ATTR_MACS; + /** Internal attribute name constant. */ public static final String ATTR_PHY_RAM = ATTR_PREFIX + ".phy.ram"; http://git-wip-us.apache.org/repos/asf/ignite/blob/5e6e4e5f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java index d1bdbb6..8a65038 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java @@ -133,6 +133,9 @@ public class CacheGroupContext { /** */ private final DataRegion dataRegion; + /** Persistence enabled flag. */ + private final boolean persistenceEnabled; + /** */ private final CacheObjectContext cacheObjCtx; @@ -158,8 +161,8 @@ public class CacheGroupContext { private volatile boolean globalWalEnabled; /** - * @param grpId Group ID. * @param ctx Context. + * @param grpId Group ID. * @param rcvdFrom Node ID cache group was received from. * @param cacheType Cache type. * @param ccfg Cache configuration. @@ -169,6 +172,7 @@ public class CacheGroupContext { * @param freeList Free list. * @param reuseList Reuse list. * @param locStartVer Topology version when group was started on local node. + * @param persistenceEnabled Persistence enabled flag. * @param walEnabled Wal enabled flag. */ CacheGroupContext( @@ -183,7 +187,9 @@ public class CacheGroupContext { FreeList freeList, ReuseList reuseList, AffinityTopologyVersion locStartVer, - boolean walEnabled) { + boolean persistenceEnabled, + boolean walEnabled + ) { assert ccfg != null; assert dataRegion != null || !affNode; assert grpId != 0 : "Invalid group ID [cache=" + ccfg.getName() + ", grpName=" + ccfg.getGroupName() + ']'; @@ -200,6 +206,7 @@ public class CacheGroupContext { this.locStartVer = locStartVer; this.cacheType = cacheType; this.globalWalEnabled = walEnabled; + this.persistenceEnabled = persistenceEnabled; this.localWalEnabled = true; persistGlobalWalState(walEnabled); @@ -912,7 +919,7 @@ public class CacheGroupContext { * @return Persistence enabled flag. */ public boolean persistenceEnabled() { - return dataRegion != null && dataRegion.config().isPersistenceEnabled(); + return persistenceEnabled; } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/5e6e4e5f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java index 70ff110..94f8a27 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ClusterCachesInfo.java @@ -38,10 +38,12 @@ import org.apache.ignite.cache.CacheExistsException; import org.apache.ignite.cache.QueryEntity; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; import org.apache.ignite.internal.GridCachePluginContext; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteNodeAttributes; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; @@ -60,6 +62,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteProductVersion; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.marshaller.jdk.JdkMarshaller; import org.apache.ignite.plugin.CachePluginContext; import org.apache.ignite.plugin.CachePluginProvider; import org.apache.ignite.plugin.PluginProvider; @@ -1752,7 +1755,7 @@ class ClusterCachesInfo { Map<String, Integer> caches = Collections.singletonMap(startedCacheCfg.getName(), cacheId); - boolean persistent = CU.isPersistentCache(startedCacheCfg, ctx.config().getDataStorageConfiguration()); + boolean persistent = resolvePersistentFlag(exchActions, startedCacheCfg); CacheGroupDescriptor grpDesc = new CacheGroupDescriptor( startedCacheCfg, @@ -1782,6 +1785,59 @@ class ClusterCachesInfo { } /** + * Resolves persistent flag for new cache group descriptor. + * + * @param exchActions Optional exchange actions to update if new group was added. + * @param startedCacheCfg Started cache configuration. + */ + private boolean resolvePersistentFlag(@Nullable ExchangeActions exchActions, CacheConfiguration<?, ?> startedCacheCfg) { + if (!ctx.clientNode()) { + // On server, we always can determine whether cache is persistent by local storage configuration. + return CU.isPersistentCache(startedCacheCfg, ctx.config().getDataStorageConfiguration()); + } + else if (exchActions == null) { + // It's either client local join event or cache is statically configured on another node. + // No need to resolve on client - we'll anyway receive group descriptor from server with correct flag. + return false; + } + else { + // Dynamic cache start. Initiator of the start may not have known whether cache should be persistent. + // On client, we should peek attributes of any affinity server node to get data storage configuration. + Collection<ClusterNode> aliveSrvNodes = ctx.discovery().aliveServerNodes(); + + assert !aliveSrvNodes.isEmpty() : "No alive server nodes"; + + for (ClusterNode srvNode : aliveSrvNodes) { + if (CU.affinityNode(srvNode, startedCacheCfg.getNodeFilter())) { + Object dsCfgBytes = srvNode.attribute(IgniteNodeAttributes.ATTR_DATA_STORAGE_CONFIG); + + if (dsCfgBytes instanceof byte[]) { + try { + DataStorageConfiguration crdDsCfg = new JdkMarshaller().unmarshal( + (byte[])dsCfgBytes, U.resolveClassLoader(ctx.config())); + + return CU.isPersistentCache(startedCacheCfg, crdDsCfg); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to unmarshal remote data storage configuration [remoteNode=" + + srvNode + ", cacheName=" + startedCacheCfg.getName() + "]", e); + } + } + else { + U.error(log, "Remote marshalled data storage configuration is absent [remoteNode=" + srvNode + + ", cacheName=" + startedCacheCfg.getName() + ", dsCfg=" + dsCfgBytes + "]"); + } + } + } + + U.error(log, "Failed to find affinity server node with data storage configuration for starting cache " + + "[cacheName=" + startedCacheCfg.getName() + ", aliveSrvNodes=" + aliveSrvNodes + "]"); + + return false; + } + } + + /** * @param ccfg Cache configuration to start. * @throws IgniteCheckedException If failed. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/5e6e4e5f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index 29bba99..7d68fc6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -2036,6 +2036,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { freeList, reuseList, exchTopVer, + desc.persistenceEnabled(), desc.walEnabled() ); http://git-wip-us.apache.org/repos/asf/ignite/blob/5e6e4e5f/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/baseline/ClientAffinityAssignmentWithBaselineTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/baseline/ClientAffinityAssignmentWithBaselineTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/baseline/ClientAffinityAssignmentWithBaselineTest.java new file mode 100644 index 0000000..15ec415 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/baseline/ClientAffinityAssignmentWithBaselineTest.java @@ -0,0 +1,974 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.ignite.internal.processors.cache.persistence.baseline; + +import java.io.File; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.LockSupport; +import javax.cache.CacheException; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cluster.BaselineNode; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.cluster.ClusterTopologyException; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteNodeAttributes; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionConcurrency; +import org.apache.ignite.transactions.TransactionIsolation; + +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.DFLT_STORE_DIR; + +/** + * Checks that client affinity assignment cache is calculated correctly regardless of current baseline topology. + */ +public class ClientAffinityAssignmentWithBaselineTest extends GridCommonAbstractTest { + /** Nodes count. */ + private static final int DEFAULT_NODES_COUNT = 5; + + /** Tx cache name. */ + private static final String PARTITIONED_TX_CACHE_NAME = "p-tx-cache"; + + /** Tx cache name with shifted affinity. */ + private static final String PARTITIONED_TX_PRIM_SYNC_CACHE_NAME = "prim-sync"; + + /** Tx cache name from client static configuration. */ + private static final String PARTITIONED_TX_CLIENT_CACHE_NAME = "p-tx-client-cache"; + + /** Atomic cache name. */ + private static final String PARTITIONED_ATOMIC_CACHE_NAME = "p-atomic-cache"; + + /** Tx cache name. */ + private static final String REPLICATED_TX_CACHE_NAME = "r-tx-cache"; + + /** Atomic cache name. */ + private static final String REPLICATED_ATOMIC_CACHE_NAME = "r-atomic-cache"; + + /** Client grid name. */ + private static final String CLIENT_GRID_NAME = "client"; + + /** Flaky node name */ + private static final String FLAKY_NODE_NAME = "flaky"; + + /** Entries. */ + private static final int ENTRIES = 3_000; + + /** Flaky node wal path. */ + public static final String FLAKY_WAL_PATH = "flakywal"; + + /** Flaky node wal archive path. */ + public static final String FLAKY_WAL_ARCHIVE_PATH = "flakywalarchive"; + + /** Flaky node storage path. */ + public static final String FLAKY_STORAGE_PATH = "flakystorage"; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + if (igniteInstanceName.startsWith(CLIENT_GRID_NAME)) { + // Intentionally skipping data storage in client configuration. + cfg.setClientMode(true); + } + else { + cfg.setDataStorageConfiguration( + new DataStorageConfiguration() + .setDefaultDataRegionConfiguration( + new DataRegionConfiguration() + .setPersistenceEnabled(true) + .setMaxSize(200 * 1024 * 1024) + ) + ); + } + + if (igniteInstanceName.contains(FLAKY_NODE_NAME)) { + File store = U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR, false); + + cfg.getDataStorageConfiguration().setWalPath(new File(store, FLAKY_WAL_PATH).getAbsolutePath()); + cfg.getDataStorageConfiguration().setWalArchivePath(new File(store, FLAKY_WAL_ARCHIVE_PATH).getAbsolutePath()); + cfg.getDataStorageConfiguration().setStoragePath(new File(store, FLAKY_STORAGE_PATH).getAbsolutePath()); + } + + cfg.setConsistentId(igniteInstanceName); + + List<CacheConfiguration> srvConfigs = new ArrayList<>(); + srvConfigs.add(cacheConfig(PARTITIONED_TX_CACHE_NAME)); + srvConfigs.add(cacheConfig(PARTITIONED_TX_PRIM_SYNC_CACHE_NAME)); + srvConfigs.add(cacheConfig(REPLICATED_ATOMIC_CACHE_NAME)); + + List<CacheConfiguration> clientConfigs = new ArrayList<>(srvConfigs); + + // Skip some configs in client static configuration to check that clients receive correct cache descriptors. + srvConfigs.add(cacheConfig(PARTITIONED_ATOMIC_CACHE_NAME)); + srvConfigs.add(cacheConfig(REPLICATED_TX_CACHE_NAME)); + + // Skip config in server static configuration to check that caches received on client join start correctly. + clientConfigs.add(cacheConfig(PARTITIONED_TX_CLIENT_CACHE_NAME)); + + if (igniteInstanceName.startsWith(CLIENT_GRID_NAME)) + cfg.setCacheConfiguration(clientConfigs.toArray(new CacheConfiguration[clientConfigs.size()])); + else + cfg.setCacheConfiguration(srvConfigs.toArray(new CacheConfiguration[srvConfigs.size()])); + + // Enforce different mac adresses to emulate distributed environment by default. + cfg.setUserAttributes(Collections.singletonMap( + IgniteNodeAttributes.ATTR_MACS_OVERRIDE, UUID.randomUUID().toString())); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + cleanPersistenceDir(); + + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + } + + /** + * @param cacheName Cache name. + */ + private CacheConfiguration<Integer, String> cacheConfig(String cacheName) { + CacheConfiguration<Integer, String> cfg = new CacheConfiguration<>(); + + if (PARTITIONED_ATOMIC_CACHE_NAME.equals(cacheName)) { + cfg.setName(PARTITIONED_ATOMIC_CACHE_NAME); + cfg.setAtomicityMode(CacheAtomicityMode.ATOMIC); + cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + cfg.setAffinity(new RendezvousAffinityFunction(false, 32)); + cfg.setBackups(2); + } + else if (PARTITIONED_TX_CACHE_NAME.equals(cacheName)) { + cfg.setName(PARTITIONED_TX_CACHE_NAME); + cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + cfg.setAffinity(new RendezvousAffinityFunction(false, 32)); + cfg.setBackups(2); + } + else if (PARTITIONED_TX_CLIENT_CACHE_NAME.equals(cacheName)) { + cfg.setName(PARTITIONED_TX_CLIENT_CACHE_NAME); + cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + cfg.setAffinity(new RendezvousAffinityFunction(false, 32)); + cfg.setBackups(2); + } + else if (PARTITIONED_TX_PRIM_SYNC_CACHE_NAME.equals(cacheName)) { + cfg.setName(PARTITIONED_TX_PRIM_SYNC_CACHE_NAME); + cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.PRIMARY_SYNC); + cfg.setAffinity(new RendezvousAffinityFunction(false, 41)); // To break collocation. + cfg.setBackups(2); + } + else if (REPLICATED_ATOMIC_CACHE_NAME.equals(cacheName)) { + cfg.setName(REPLICATED_ATOMIC_CACHE_NAME); + cfg.setAtomicityMode(CacheAtomicityMode.ATOMIC); + cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + cfg.setAffinity(new RendezvousAffinityFunction(false, 32)); + cfg.setCacheMode(CacheMode.REPLICATED); + } + else if (REPLICATED_TX_CACHE_NAME.equals(cacheName)) { + cfg.setName(REPLICATED_TX_CACHE_NAME); + cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + cfg.setAffinity(new RendezvousAffinityFunction(false, 32)); + cfg.setCacheMode(CacheMode.REPLICATED); + } + else + throw new IllegalArgumentException("Unexpected cache name"); + + return cfg; + } + + /** + * + */ + public void testPartitionedAtomicCache() throws Exception { + testChangingBaselineDown(PARTITIONED_ATOMIC_CACHE_NAME, false); + } + + /** + * + */ + public void testPartitionedTxCache() throws Exception { + testChangingBaselineDown(PARTITIONED_TX_CACHE_NAME, false); + } + + /** + * Test that activation after client join won't break cache. + */ + public void testLateActivation() throws Exception { + testChangingBaselineDown(PARTITIONED_TX_CACHE_NAME, true); + } + + /** + * + */ + public void testReplicatedAtomicCache() throws Exception { + testChangingBaselineDown(REPLICATED_ATOMIC_CACHE_NAME, false); + } + + /** + * + */ + public void testReplicatedTxCache() throws Exception { + testChangingBaselineDown(REPLICATED_TX_CACHE_NAME, false); + } + + /** + * Tests that changing baseline down under load won't break cache. + */ + private void testChangingBaselineDown(String cacheName, boolean lateActivation) throws Exception { + IgniteEx ig0 = (IgniteEx)startGrids(DEFAULT_NODES_COUNT); + + IgniteEx client1 = null; + IgniteEx client2 = null; + + if (lateActivation) { + client1 = (IgniteEx)startGrid("client1"); + client2 = (IgniteEx)startGrid("client2"); + } + else + ig0.cluster().active(true); + + AtomicBoolean stopLoad = new AtomicBoolean(false); + + AtomicReference<Throwable> loadError = new AtomicReference<>(null); + + if (lateActivation) + ig0.cluster().active(true); + + IgniteCache<Integer, String> cache = ig0.cache(cacheName); + + System.out.println("### Starting preloading"); + + for (int i = 0; i < ENTRIES; i++) { + ThreadLocalRandom r = ThreadLocalRandom.current(); + + byte[] randBytes = new byte[r.nextInt(10, 100)]; + + cache.put(r.nextInt(ENTRIES), new String(randBytes)); + } + + System.out.println("### Preloading is finished"); + + if (!lateActivation) { + client1 = (IgniteEx)startGrid("client1"); + client2 = (IgniteEx)startGrid("client2"); + } + + ConcurrentMap<Long, Long> threadProgressTracker = new ConcurrentHashMap<>(); + + startSimpleLoadThread(client1, cacheName, stopLoad, loadError, threadProgressTracker); + startSimpleLoadThread(client1, cacheName, stopLoad, loadError, threadProgressTracker); + startSimpleLoadThread(client1, cacheName, stopLoad, loadError, threadProgressTracker); + startTxLoadThread(client2, cacheName, stopLoad, loadError, threadProgressTracker); + startTxLoadThread(client2, cacheName, stopLoad, loadError, threadProgressTracker); + startTxLoadThread(client2, cacheName, stopLoad, loadError, threadProgressTracker); + + awaitProgressInAllLoaders(10_000, loadError, threadProgressTracker); + + List<BaselineNode> fullBlt = new ArrayList<>(); + for (int i = 0; i < DEFAULT_NODES_COUNT; i++) + fullBlt.add(grid(i).localNode()); + + stopGrid(DEFAULT_NODES_COUNT - 1, true); + stopGrid(DEFAULT_NODES_COUNT - 2, true); + + awaitProgressInAllLoaders(10_000, loadError, threadProgressTracker); + + tryChangeBaselineDown(ig0, fullBlt, DEFAULT_NODES_COUNT - 1, loadError, threadProgressTracker); + tryChangeBaselineDown(ig0, fullBlt, DEFAULT_NODES_COUNT - 2, loadError, threadProgressTracker); + + stopLoad.set(true); + } + + /** + * Tests that rejoin of baseline node with clear LFS under load won't break cache. + */ + public void testRejoinWithCleanLfs() throws Exception { + IgniteEx ig0 = (IgniteEx)startGrids(DEFAULT_NODES_COUNT - 1); + startGrid("flaky"); + + ig0.cluster().active(true); + + AtomicBoolean stopLoad = new AtomicBoolean(false); + + AtomicReference<Throwable> loadError = new AtomicReference<>(null); + + IgniteCache<Integer, String> cache1 = ig0.cache(PARTITIONED_ATOMIC_CACHE_NAME); + IgniteCache<Integer, String> cache2 = ig0.cache(PARTITIONED_TX_CACHE_NAME); + IgniteCache<Integer, String> cache3 = ig0.cache(REPLICATED_ATOMIC_CACHE_NAME); + IgniteCache<Integer, String> cache4 = ig0.cache(REPLICATED_TX_CACHE_NAME); + + System.out.println("### Starting preloading"); + + for (int i = 0; i < ENTRIES; i++) { + ThreadLocalRandom r = ThreadLocalRandom.current(); + + cache1.put(r.nextInt(ENTRIES), new String(new byte[r.nextInt(10, 100)])); + cache2.put(r.nextInt(ENTRIES), new String(new byte[r.nextInt(10, 100)])); + cache3.put(r.nextInt(ENTRIES), new String(new byte[r.nextInt(10, 100)])); + cache4.put(r.nextInt(ENTRIES), new String(new byte[r.nextInt(10, 100)])); + } + + System.out.println("### Preloading is finished"); + + IgniteEx client1 = (IgniteEx)startGrid("client1"); + IgniteEx client2 = (IgniteEx)startGrid("client2"); + + ConcurrentMap<Long, Long> threadProgressTracker = new ConcurrentHashMap<>(); + + startSimpleLoadThread(client1, PARTITIONED_ATOMIC_CACHE_NAME, stopLoad, loadError, threadProgressTracker); + startSimpleLoadThread(client1, PARTITIONED_TX_CACHE_NAME, stopLoad, loadError, threadProgressTracker); + startSimpleLoadThread(client1, REPLICATED_ATOMIC_CACHE_NAME, stopLoad, loadError, threadProgressTracker); + startTxLoadThread(client2, PARTITIONED_ATOMIC_CACHE_NAME, stopLoad, loadError, threadProgressTracker); + startTxLoadThread(client2, PARTITIONED_TX_CACHE_NAME, stopLoad, loadError, threadProgressTracker); + startTxLoadThread(client2, REPLICATED_TX_CACHE_NAME, stopLoad, loadError, threadProgressTracker); + + awaitProgressInAllLoaders(10_000, loadError, threadProgressTracker); + + stopGrid("flaky"); + + awaitProgressInAllLoaders(10_000, loadError, threadProgressTracker); + + File store = U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_STORE_DIR, false); + + U.delete(new File(store, FLAKY_WAL_PATH)); + U.delete(new File(store, FLAKY_WAL_ARCHIVE_PATH)); + U.delete(new File(store, FLAKY_STORAGE_PATH)); + + startGrid("flaky"); + + System.out.println("### Starting rebalancing after flaky node join"); + waitForRebalancing(); + System.out.println("### Rebalancing is finished after flaky node join"); + + awaitProgressInAllLoaders(10_000, loadError, threadProgressTracker); + + stopLoad.set(true); + } + + /** + * Test that changing baseline down under cross-cache txs load won't break cache. + */ + public void testCrossCacheTxs() throws Exception { + IgniteEx ig0 = (IgniteEx)startGrids(DEFAULT_NODES_COUNT); + + ig0.cluster().active(true); + + AtomicBoolean stopLoad = new AtomicBoolean(false); + + AtomicReference<Throwable> loadError = new AtomicReference<>(null); + + String cacheName1 = PARTITIONED_TX_CACHE_NAME; + String cacheName2 = PARTITIONED_TX_PRIM_SYNC_CACHE_NAME; + + IgniteCache<Integer, String> cache1 = ig0.cache(PARTITIONED_TX_CACHE_NAME); + IgniteCache<Integer, String> cache2 = ig0.cache(PARTITIONED_TX_PRIM_SYNC_CACHE_NAME); + + System.out.println("### Starting preloading"); + + for (int i = 0; i < ENTRIES; i++) { + ThreadLocalRandom r = ThreadLocalRandom.current(); + + byte[] randBytes1 = new byte[r.nextInt(10, 100)]; + byte[] randBytes2 = new byte[r.nextInt(10, 100)]; + + cache1.put(r.nextInt(ENTRIES), new String(randBytes1)); + cache2.put(r.nextInt(ENTRIES), new String(randBytes2)); + } + + System.out.println("### Preloading is finished"); + + IgniteEx client1 = (IgniteEx)startGrid("client1"); + IgniteEx client2 = (IgniteEx)startGrid("client2"); + + ConcurrentMap<Long, Long> threadProgressTracker = new ConcurrentHashMap<>(); + + startCrossCacheTxLoadThread(client1, cacheName1, cacheName2, stopLoad, loadError, threadProgressTracker); + startCrossCacheTxLoadThread(client1, cacheName1, cacheName2, stopLoad, loadError, threadProgressTracker); + startCrossCacheTxLoadThread(client1, cacheName2, cacheName1, stopLoad, loadError, threadProgressTracker); + startCrossCacheTxLoadThread(client2, cacheName1, cacheName2, stopLoad, loadError, threadProgressTracker); + startCrossCacheTxLoadThread(client2, cacheName1, cacheName2, stopLoad, loadError, threadProgressTracker); + startCrossCacheTxLoadThread(client2, cacheName2, cacheName1, stopLoad, loadError, threadProgressTracker); + + awaitProgressInAllLoaders(10_000, loadError, threadProgressTracker); + + List<BaselineNode> fullBlt = new ArrayList<>(); + for (int i = 0; i < DEFAULT_NODES_COUNT; i++) + fullBlt.add(grid(i).localNode()); + + stopGrid(DEFAULT_NODES_COUNT - 1, true); + stopGrid(DEFAULT_NODES_COUNT - 2, true); + + awaitProgressInAllLoaders(10_000, loadError, threadProgressTracker); + + tryChangeBaselineDown(ig0, fullBlt, DEFAULT_NODES_COUNT - 1, loadError, threadProgressTracker); + tryChangeBaselineDown(ig0, fullBlt, DEFAULT_NODES_COUNT - 2, loadError, threadProgressTracker); + + stopLoad.set(true); + } + + /** + * Tests that join of non-baseline node while long transactions are running won't break dynamically started cache. + */ + public void testDynamicCacheLongTransactionNodeStart() throws Exception { + IgniteEx ig0 = (IgniteEx)startGrids(4); + + ig0.cluster().active(true); + + IgniteEx client = (IgniteEx)startGrid("client"); + + CacheConfiguration<Integer, String> dynamicCacheCfg = cacheConfig(REPLICATED_TX_CACHE_NAME); + dynamicCacheCfg.setName("dyn"); + + IgniteCache<Integer, String> dynamicCache = client.getOrCreateCache(dynamicCacheCfg); + + for (int i = 0; i < ENTRIES; i++) + dynamicCache.put(i, "abacaba" + i); + + AtomicBoolean releaseTx = new AtomicBoolean(false); + CountDownLatch allTxsDoneLatch = new CountDownLatch(10); + + for (int i = 0; i < 10; i++) { + final int i0 = i; + + GridTestUtils.runAsync(new Runnable() { + @Override public void run() { + try (Transaction tx = client.transactions().txStart(TransactionConcurrency.PESSIMISTIC, + TransactionIsolation.REPEATABLE_READ)) { + dynamicCache.put(i0, "txtxtxtx" + i0); + + while (!releaseTx.get()) + LockSupport.parkNanos(1_000_000); + + tx.commit(); + + System.out.println("Tx #" + i0 + " committed"); + } + catch (Throwable t) { + System.out.println("Tx #" + i0 + " failed"); + + t.printStackTrace(); + } + finally { + allTxsDoneLatch.countDown(); + } + } + }); + } + + GridTestUtils.runAsync(new Runnable() { + @Override public void run() { + try { + startGrid(4); + } + catch (Exception e) { + e.printStackTrace(); + } + } + }); + + U.sleep(1_000); + + releaseTx.set(true); + + allTxsDoneLatch.await(); + + for (int i = 0; i < 10_000; i++) + assertEquals("txtxtxtx" + (i % 10), dynamicCache.get(i % 10)); + } + + /** + * Tests that if dynamic cache has no affinity nodes at the moment of start, + * it will still work correctly when affinity nodes will appear. + */ + public void testDynamicCacheStartNoAffinityNodes() throws Exception { + fail("IGNITE-8652"); + + IgniteEx ig0 = startGrid(0); + + ig0.cluster().active(true); + + IgniteEx client = (IgniteEx)startGrid("client"); + + CacheConfiguration<Integer, String> dynamicCacheCfg = new CacheConfiguration<Integer, String>() + .setName("dyn") + .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL) + .setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC) + .setAffinity(new RendezvousAffinityFunction(false, 32)) + .setBackups(2) + .setNodeFilter(new ConsistentIdNodeFilter((Serializable)ig0.localNode().consistentId())); + + IgniteCache<Integer, String> dynamicCache = client.getOrCreateCache(dynamicCacheCfg); + + for (int i = 1; i < 4; i++) + startGrid(i); + + resetBaselineTopology(); + + for (int i = 0; i < ENTRIES; i++) + dynamicCache.put(i, "abacaba" + i); + + AtomicBoolean releaseTx = new AtomicBoolean(false); + CountDownLatch allTxsDoneLatch = new CountDownLatch(10); + + for (int i = 0; i < 10; i++) { + final int i0 = i; + + GridTestUtils.runAsync(new Runnable() { + @Override public void run() { + try (Transaction tx = client.transactions().txStart(TransactionConcurrency.PESSIMISTIC, + TransactionIsolation.REPEATABLE_READ)) { + dynamicCache.put(i0, "txtxtxtx" + i0); + + while (!releaseTx.get()) + LockSupport.parkNanos(1_000_000); + + tx.commit(); + + System.out.println("Tx #" + i0 + " committed"); + } + catch (Throwable t) { + System.out.println("Tx #" + i0 + " failed"); + + t.printStackTrace(); + } + finally { + allTxsDoneLatch.countDown(); + } + } + }); + } + + GridTestUtils.runAsync(new Runnable() { + @Override public void run() { + try { + startGrid(4); + } + catch (Exception e) { + e.printStackTrace(); + } + } + }); + + U.sleep(1_000); + + releaseTx.set(true); + + allTxsDoneLatch.await(); + + for (int i = 0; i < 10_000; i++) + assertEquals("txtxtxtx" + (i % 10), dynamicCache.get(i % 10)); + } + + /** + * Tests that join of non-baseline node while long transactions are running won't break cache started on client join. + */ + public void testClientJoinCacheLongTransactionNodeStart() throws Exception { + IgniteEx ig0 = (IgniteEx)startGrids(4); + + ig0.cluster().active(true); + + IgniteEx client = (IgniteEx)startGrid("client"); + + IgniteCache<Integer, String> clientJoinCache = client.cache(PARTITIONED_TX_CLIENT_CACHE_NAME); + + for (int i = 0; i < ENTRIES; i++) + clientJoinCache.put(i, "abacaba" + i); + + AtomicBoolean releaseTx = new AtomicBoolean(false); + CountDownLatch allTxsDoneLatch = new CountDownLatch(10); + + for (int i = 0; i < 10; i++) { + final int i0 = i; + + GridTestUtils.runAsync(new Runnable() { + @Override public void run() { + try (Transaction tx = client.transactions().txStart(TransactionConcurrency.PESSIMISTIC, + TransactionIsolation.REPEATABLE_READ)) { + clientJoinCache.put(i0, "txtxtxtx" + i0); + + while (!releaseTx.get()) + LockSupport.parkNanos(1_000_000); + + tx.commit(); + + System.out.println("Tx #" + i0 + " committed"); + } + catch (Throwable t) { + System.out.println("Tx #" + i0 + " failed"); + + t.printStackTrace(); + } + finally { + allTxsDoneLatch.countDown(); + } + } + }); + } + + GridTestUtils.runAsync(new Runnable() { + @Override public void run() { + try { + startGrid(4); + } + catch (Exception e) { + e.printStackTrace(); + } + } + }); + + U.sleep(1_000); + + releaseTx.set(true); + + allTxsDoneLatch.await(); + + for (int i = 0; i < 10_000; i++) + assertEquals("txtxtxtx" + (i % 10), clientJoinCache.get(i % 10)); + } + + /** + * @param ig0 Ignite. + * @param fullBlt Initial BLT list. + * @param newBaselineSize New baseline size. + * @param threadProgressTracker Thread progress tracker. + */ + private void tryChangeBaselineDown( + IgniteEx ig0, + List<BaselineNode> fullBlt, + int newBaselineSize, + AtomicReference<Throwable> loadError, + ConcurrentMap<Long, Long> threadProgressTracker + ) throws Exception { + System.out.println("### Changing BLT: " + (newBaselineSize + 1) + " -> " + newBaselineSize); + ig0.cluster().setBaselineTopology(fullBlt.subList(0, newBaselineSize)); + + System.out.println("### Starting rebalancing after BLT change: " + (newBaselineSize + 1) + " -> " + newBaselineSize); + waitForRebalancing(); + System.out.println("### Rebalancing is finished after BLT change: " + (newBaselineSize + 1) + " -> " + newBaselineSize); + + awaitProgressInAllLoaders(10_000, loadError, threadProgressTracker); + + if (loadError.get() != null) { + loadError.get().printStackTrace(); + + fail("Unexpected error in load thread: " + loadError.get().toString()); + } + } + + /** + * @param ig Ignite instance. + * @param cacheName Cache name. + * @param stopFlag Stop flag. + * @param loadError Load error reference. + * @param threadProgressTracker Progress tracker. + */ + private void startSimpleLoadThread( + IgniteEx ig, + String cacheName, + AtomicBoolean stopFlag, + AtomicReference<Throwable> loadError, + ConcurrentMap<Long, Long> threadProgressTracker + ) { + GridTestUtils.runAsync(new Runnable() { + @Override public void run() { + ThreadLocalRandom r = ThreadLocalRandom.current(); + + IgniteCache<Integer, String> cache = ig.cache(cacheName); + + try { + while (!stopFlag.get()) { + try { + int op = r.nextInt(3); + + switch (op) { + case 0: + byte[] randBytes = new byte[r.nextInt(10, 100)]; + + cache.put(r.nextInt(ENTRIES), new String(randBytes)); + + break; + case 1: + cache.remove(r.nextInt(ENTRIES)); + + break; + case 2: + cache.get(r.nextInt(ENTRIES)); + + break; + } + + threadProgressTracker.compute(Thread.currentThread().getId(), + (tId, ops) -> ops == null ? 1 : ops + 1); + } + catch (CacheException e) { + if (e.getCause() instanceof ClusterTopologyException) + ((ClusterTopologyException)e.getCause()).retryReadyFuture().get(); + } + catch (ClusterTopologyException e) { + e.retryReadyFuture().get(); + } + } + } + catch (Throwable t) { + loadError.compareAndSet(null, t); + + stopFlag.set(true); + } + } + }); + } + + /** + * @param ig Ignite instance. + * @param cacheName Cache name. + * @param stopFlag Stop flag. + * @param loadError Load error reference. + * @param threadProgressTracker Progress tracker. + */ + private void startTxLoadThread( + IgniteEx ig, + String cacheName, + AtomicBoolean stopFlag, + AtomicReference<Throwable> loadError, + ConcurrentMap<Long, Long> threadProgressTracker + ) { + GridTestUtils.runAsync(new Runnable() { + @Override public void run() { + ThreadLocalRandom r = ThreadLocalRandom.current(); + + IgniteCache<Integer, String> cache = ig.cache(cacheName); + + boolean pessimistic = r.nextBoolean(); + + boolean rollback = r.nextBoolean(); + + try { + while (!stopFlag.get()) { + try (Transaction tx = ig.transactions().txStart( + pessimistic ? TransactionConcurrency.PESSIMISTIC : TransactionConcurrency.OPTIMISTIC, + TransactionIsolation.REPEATABLE_READ + )) { + int key1 = -1; + String val1 = null; + while (val1 == null) { + key1 = r.nextInt(ENTRIES); + val1 = cache.get(key1); + } + + int key2 = -1; + String val2 = null; + while (val2 == null) { + key2 = r.nextInt(ENTRIES); + val2 = cache.get(key2); + } + + cache.put(key1, val2); + cache.put(key2, val1); + + if (rollback) + tx.rollback(); + else + tx.commit(); + + threadProgressTracker.compute(Thread.currentThread().getId(), + (tId, ops) -> ops == null ? 1 : ops + 1); + } + catch (CacheException e) { + if (e.getCause() instanceof ClusterTopologyException) + ((ClusterTopologyException)e.getCause()).retryReadyFuture().get(); + } + catch (ClusterTopologyException e) { + e.retryReadyFuture().get(); + } + } + } + catch (Throwable t) { + loadError.compareAndSet(null, t); + + stopFlag.set(true); + } + } + }); + } + + /** + * @param ig Ignite instance. + * @param cacheName1 Cache name 1. + * @param cacheName2 Cache name 2. + * @param stopFlag Stop flag. + * @param loadError Load error reference. + * @param threadProgressTracker Progress tracker. + */ + private void startCrossCacheTxLoadThread( + IgniteEx ig, + String cacheName1, + String cacheName2, + AtomicBoolean stopFlag, + AtomicReference<Throwable> loadError, + ConcurrentMap<Long, Long> threadProgressTracker + ) { + GridTestUtils.runAsync(new Runnable() { + @Override public void run() { + ThreadLocalRandom r = ThreadLocalRandom.current(); + + IgniteCache<Integer, String> cache1 = ig.cache(cacheName1); + IgniteCache<Integer, String> cache2 = ig.cache(cacheName2); + + boolean pessimistic = r.nextBoolean(); + + boolean rollback = r.nextBoolean(); + + try { + while (!stopFlag.get()) { + try (Transaction tx = ig.transactions().txStart( + pessimistic ? TransactionConcurrency.PESSIMISTIC : TransactionConcurrency.OPTIMISTIC, + TransactionIsolation.REPEATABLE_READ + )) { + int key1 = -1; + String val1 = null; + while (val1 == null) { + key1 = r.nextInt(ENTRIES); + val1 = cache1.get(key1); + } + + int key2 = -1; + String val2 = null; + while (val2 == null) { + key2 = r.nextInt(ENTRIES); + val2 = cache2.get(key2); + } + + cache1.put(key1, val2); + cache2.put(key2, val1); + + if (rollback) + tx.rollback(); + else + tx.commit(); + + threadProgressTracker.compute(Thread.currentThread().getId(), + (tId, ops) -> ops == null ? 1 : ops + 1); + } + catch (CacheException e) { + if (e.getCause() instanceof ClusterTopologyException) + ((ClusterTopologyException)e.getCause()).retryReadyFuture().get(); + } + catch (ClusterTopologyException e) { + e.retryReadyFuture().get(); + } + } + } + catch (Throwable t) { + loadError.compareAndSet(null, t); + + stopFlag.set(true); + } + } + }); + } + + /** + * @param waitMs Wait milliseconds. + * @param loadError Load error. + * @param threadProgressTracker Thread progress tracker. + */ + private void awaitProgressInAllLoaders( + long waitMs, + AtomicReference<Throwable> loadError, + ConcurrentMap<Long, Long> threadProgressTracker + ) throws Exception { + Map<Long, Long> view1 = new HashMap<>(threadProgressTracker); + + long startTs = U.currentTimeMillis(); + + while (U.currentTimeMillis() < startTs + waitMs) { + Map<Long, Long> view2 = new HashMap<>(threadProgressTracker); + + if (loadError.get() != null) { + loadError.get().printStackTrace(); + + fail("Unexpected error in load thread: " + loadError.get().toString()); + } + + boolean frozenThreadExists = false; + + for (Map.Entry<Long, Long> entry : view1.entrySet()) { + if (entry.getValue().equals(view2.get(entry.getKey()))) + frozenThreadExists = true; + } + + if (!frozenThreadExists) + return; + + U.sleep(100); + } + + fail("No progress in load thread"); + } + + /** + * Accepts all nodes except one with specified consistent ID. + */ + private static class ConsistentIdNodeFilter implements IgnitePredicate<ClusterNode> { + /** Consistent ID. */ + private final Serializable consId0; + + /** + * @param consId0 Consistent ID. + */ + public ConsistentIdNodeFilter(Serializable consId0) { + this.consId0 = consId0; + } + + /** {@inheritDoc} */ + @Override public boolean apply(ClusterNode node) { + return !node.consistentId().equals(consId0); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/5e6e4e5f/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java index faa2333..8add7ed 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgnitePdsTestSuite2.java @@ -31,6 +31,7 @@ import org.apache.ignite.internal.processors.cache.persistence.IgnitePdsRecovery import org.apache.ignite.internal.processors.cache.persistence.IgnitePersistentStoreDataStructuresTest; import org.apache.ignite.internal.processors.cache.persistence.LocalWalModeChangeDuringRebalancingSelfTest; import org.apache.ignite.internal.processors.cache.persistence.baseline.IgniteAbsentEvictionNodeOutOfBaselineTest; +import org.apache.ignite.internal.processors.cache.persistence.baseline.ClientAffinityAssignmentWithBaselineTest; import org.apache.ignite.internal.processors.cache.persistence.baseline.IgniteAllBaselineNodesOnlineFullApiSelfTest; import org.apache.ignite.internal.processors.cache.persistence.baseline.IgniteOfflineBaselineNodeFullApiSelfTest; import org.apache.ignite.internal.processors.cache.persistence.baseline.IgniteOnlineNodeOutOfBaselineFullApiSelfTest; @@ -77,6 +78,7 @@ public class IgnitePdsTestSuite2 extends TestSuite { suite.addTestSuite(IgniteAllBaselineNodesOnlineFullApiSelfTest.class); suite.addTestSuite(IgniteOfflineBaselineNodeFullApiSelfTest.class); suite.addTestSuite(IgniteOnlineNodeOutOfBaselineFullApiSelfTest.class); + suite.addTestSuite(ClientAffinityAssignmentWithBaselineTest.class); suite.addTestSuite(IgniteAbsentEvictionNodeOutOfBaselineTest.class); suite.addTestSuite(FileDownloaderTest.class);
