IGNITE-9694 Add tests to check that reading queries are not blocked on exchange events that don't change data visibility - Fixes #4926.
Signed-off-by: Dmitriy Govorukhin <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/3fae41b1 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/3fae41b1 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/3fae41b1 Branch: refs/heads/master Commit: 3fae41b1fce89f2f05ff9027cdc37ed84f3a70a0 Parents: 3a4167a Author: ibessonov <[email protected]> Authored: Mon Oct 22 14:59:19 2018 +0300 Committer: Dmitriy Govorukhin <[email protected]> Committed: Mon Oct 22 14:59:19 2018 +0300 ---------------------------------------------------------------------- .../ignite/internal/util/IgniteUtils.java | 12 +- .../ignite/internal/util/lang/GridFunc.java | 2 + .../distributed/CacheBlockOnGetAllTest.java | 196 +++ .../CacheBlockOnReadAbstractTest.java | 1277 ++++++++++++++++++ .../cache/distributed/CacheBlockOnScanTest.java | 73 + .../distributed/CacheBlockOnSingleGetTest.java | 190 +++ .../testframework/junits/GridAbstractTest.java | 33 + ...eBlockExchangeOnReadOperationsTestSuite.java | 51 + .../testsuites/IgniteCacheTestSuite7.java | 2 +- .../CacheBlockOnCreateDestoryIndexTest.java | 480 +++++++ .../distributed/CacheBlockOnSqlQueryTest.java | 131 ++ ...ockExchangeOnSqlReadOperationsTestSuite.java | 39 + 12 files changed, 2475 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/3fae41b1/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index d10a7c7..e6f374a 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -8169,23 +8169,15 @@ public abstract class IgniteUtils { for (Class cls = obj.getClass(); cls != Object.class; cls = cls.getSuperclass()) { for (Field field : cls.getDeclaredFields()) { if (field.getName().equals(fieldName)) { - boolean accessible = field.isAccessible(); - field.setAccessible(true); - T val = (T)field.get(obj); - - if (!accessible) - field.setAccessible(false); - - return val; + return (T)field.get(obj); } } } } catch (Exception e) { - throw new IgniteException("Failed to get field value [fieldName=" + fieldName + ", obj=" + obj + ']', - e); + throw new IgniteException("Failed to get field value [fieldName=" + fieldName + ", obj=" + obj + ']', e); } throw new IgniteException("Failed to get field value [fieldName=" + fieldName + ", obj=" + obj + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/3fae41b1/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java index ce5076b..3b3bbaa 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/lang/GridFunc.java @@ -94,6 +94,7 @@ import org.apache.ignite.internal.util.lang.gridfunc.TransformFilteringIterator; import org.apache.ignite.internal.util.lang.gridfunc.TransformMapView; import org.apache.ignite.internal.util.lang.gridfunc.TransformMapView2; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiClosure; @@ -2172,6 +2173,7 @@ public class GridFunc { * @param t2 Second object in pair. * @param <T> Type of objects in pair. * @return Pair of objects. + * @deprecated Use {@link T2} instead. */ @Deprecated public static <T> IgnitePair<T> pair(@Nullable T t1, @Nullable T t2) { http://git-wip-us.apache.org/repos/asf/ignite/blob/3fae41b1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBlockOnGetAllTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBlockOnGetAllTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBlockOnGetAllTest.java new file mode 100644 index 0000000..084a431 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBlockOnGetAllTest.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import java.util.HashSet; +import java.util.Random; +import java.util.Set; +import org.jetbrains.annotations.NotNull; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; + +/** + * + */ +public class CacheBlockOnGetAllTest extends CacheBlockOnReadAbstractTest { + + /** {@inheritDoc} */ + @Override @NotNull protected CacheReadBackgroundOperation<?, ?> getReadOperation() { + return new IntCacheReadBackgroundOperation() { + /** Random. */ + private Random random = new Random(); + + /** {@inheritDoc} */ + @Override public void doRead() { + Set<Integer> keys = new HashSet<>(); + + for (int i = 0; i < 500; i++) + keys.add(random.nextInt(entriesCount())); + + cache().getAll(keys); + } + }; + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = PARTITIONED) + @Override public void testStopBaselineAtomicPartitioned() { + fail("https://issues.apache.org/jira/browse/IGNITE-9915"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = REPLICATED) + @Override public void testStopBaselineAtomicReplicated() { + fail("https://issues.apache.org/jira/browse/IGNITE-9915"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED) + @Override public void testStopBaselineTransactionalPartitioned() { + fail("https://issues.apache.org/jira/browse/IGNITE-9915"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED) + @Override public void testStopBaselineTransactionalReplicated() { + fail("https://issues.apache.org/jira/browse/IGNITE-9915"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = PARTITIONED) + @Override public void testCreateCacheAtomicPartitioned() { + fail("https://issues.apache.org/jira/browse/IGNITE-9883"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = REPLICATED) + @Override public void testCreateCacheAtomicReplicated() { + fail("https://issues.apache.org/jira/browse/IGNITE-9883"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED) + @Override public void testCreateCacheTransactionalPartitioned() { + fail("https://issues.apache.org/jira/browse/IGNITE-9883"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED) + @Override public void testCreateCacheTransactionalReplicated() { + fail("https://issues.apache.org/jira/browse/IGNITE-9883"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = PARTITIONED) + @Override public void testDestroyCacheAtomicPartitioned() { + fail("https://issues.apache.org/jira/browse/IGNITE-9883"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = REPLICATED) + @Override public void testDestroyCacheAtomicReplicated() { + fail("https://issues.apache.org/jira/browse/IGNITE-9883"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED) + @Override public void testDestroyCacheTransactionalPartitioned() { + fail("https://issues.apache.org/jira/browse/IGNITE-9883"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED) + @Override public void testDestroyCacheTransactionalReplicated() { + fail("https://issues.apache.org/jira/browse/IGNITE-9883"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = PARTITIONED) + @Override public void testStartServerAtomicPartitioned() { + fail("https://issues.apache.org/jira/browse/IGNITE-9883"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = REPLICATED) + @Override public void testStartServerAtomicReplicated() { + fail("https://issues.apache.org/jira/browse/IGNITE-9883"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED) + @Override public void testStartServerTransactionalPartitioned() { + fail("https://issues.apache.org/jira/browse/IGNITE-9883"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED) + @Override public void testStartServerTransactionalReplicated() { + fail("https://issues.apache.org/jira/browse/IGNITE-9883"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = PARTITIONED) + @Override public void testStopServerAtomicPartitioned() { + fail("https://issues.apache.org/jira/browse/IGNITE-9883"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = REPLICATED) + @Override public void testStopServerAtomicReplicated() { + fail("https://issues.apache.org/jira/browse/IGNITE-9883"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED) + @Override public void testStopServerTransactionalPartitioned() { + fail("https://issues.apache.org/jira/browse/IGNITE-9883"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED) + @Override public void testStopServerTransactionalReplicated() { + fail("https://issues.apache.org/jira/browse/IGNITE-9883"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = PARTITIONED) + @Override public void testUpdateBaselineTopologyAtomicPartitioned() { + fail("https://issues.apache.org/jira/browse/IGNITE-9883"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = REPLICATED) + @Override public void testUpdateBaselineTopologyAtomicReplicated() { + fail("https://issues.apache.org/jira/browse/IGNITE-9883"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED) + @Override public void testUpdateBaselineTopologyTransactionalPartitioned() { + fail("https://issues.apache.org/jira/browse/IGNITE-9883"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED) + @Override public void testUpdateBaselineTopologyTransactionalReplicated() { + fail("https://issues.apache.org/jira/browse/IGNITE-9883"); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3fae41b1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBlockOnReadAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBlockOnReadAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBlockOnReadAbstractTest.java new file mode 100644 index 0000000..42b5df0 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBlockOnReadAbstractTest.java @@ -0,0 +1,1277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Predicate; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteInterruptedException; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataRegionConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.events.EventType; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.events.DiscoveryCustomEvent; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch; +import org.apache.ignite.internal.processors.cache.ExchangeActions; +import org.apache.ignite.internal.processors.cache.ExchangeActions.CacheActionData; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage; +import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteBiPredicate; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.jetbrains.annotations.NotNull; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; + +/** + * + */ +public abstract class CacheBlockOnReadAbstractTest extends GridCommonAbstractTest { + /** Default cache entries count. */ + private static final int DFLT_CACHE_ENTRIES_CNT = 2 * 1024; + + /** Ip finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** List of baseline nodes started at the beginning of the test. */ + protected final List<IgniteEx> baseline = new CopyOnWriteArrayList<>(); + + /** List of server nodes started at the beginning of the test. */ + protected final List<IgniteEx> srvs = new CopyOnWriteArrayList<>(); + + /** List of client nodes started at the beginning of the test. */ + protected final List<IgniteEx> clients = new CopyOnWriteArrayList<>(); + + /** Start node in client mode. */ + private volatile boolean startNodesInClientMode; + + /** Latch that is used to wait until all required messages are blocked. */ + private volatile CountDownLatch cntFinishedReadOperations; + + /** Custom ip finder. */ + private volatile TcpDiscoveryIpFinder customIpFinder; + + /** + * Number of baseline servers to start before test. + * + * @see Params#baseline() + */ + protected int baselineServersCount() { + return currentTestParams().baseline(); + } + + /** + * Number of non-baseline servers to start before test. + * + * @see Params#servers() + */ + protected int serversCount() { + return currentTestParams().servers(); + } + + /** + * Number of clients to start before test. + * + * @see Params#clients() + */ + protected int clientsCount() { + return currentTestParams().clients(); + } + + /** + * Number of backups to configure in caches by default. + */ + protected int backupsCount() { + return Math.min(3, baselineServersCount() - 1); + } + + /** + * Number of milliseconds to warmup reading process. Used to lower fluctuations in run time. Might be 0. + * + * @see Params#warmup() + */ + protected long warmup() { + return currentTestParams().warmup(); + } + + /** + * Number of milliseconds to wait on the potentially blocking operation. + * + * @see Params#timeout() + */ + protected long timeout() { + return currentTestParams().timeout(); + } + + /** + * Cache atomicity mode. + * + * @see Params#atomicityMode() + */ + protected CacheAtomicityMode atomicityMode() { + return currentTestParams().atomicityMode(); + } + + /** + * Cache mode. + * + * @see Params#cacheMode() + */ + protected CacheMode cacheMode() { + return currentTestParams().cacheMode(); + } + + /** + * Whether allowing {@link ClusterTopologyCheckedException} as the valid reading result or not. + * + * @see Params#allowException() + */ + protected boolean allowException() { + return currentTestParams().allowException(); + } + + /** + * @param startNodesInClientMode Start nodes on client mode. + */ + public void startNodesInClientMode(boolean startNodesInClientMode) { + this.startNodesInClientMode = startNodesInClientMode; + } + + /** List of baseline nodes started at the beginning of the test. */ + public List<? extends IgniteEx> baseline() { + return baseline; + } + + /** List of server nodes started at the beginning of the test. */ + public List<? extends IgniteEx> servers() { + return srvs; + } + + /** List of client nodes started at the beginning of the test. */ + public List<? extends IgniteEx> clients() { + return clients; + } + + /** + * Annotation to configure test methods in {@link CacheBlockOnReadAbstractTest}. Its values are used throughout + * test implementation. + */ + @Target(ElementType.METHOD) + @Retention(RetentionPolicy.RUNTIME) + public @interface Params { + /** + * Number of baseline servers to start before test. + */ + int baseline() default 3; + + /** + * Number of non-baseline servers to start before test. + */ + int servers() default 1; + + /** + * Number of clients to start before test. + */ + int clients() default 1; + + /** + * Number of milliseconds to warmup reading process. Used to lower fluctuations in run time. Might be 0. + */ + long warmup() default 2000L; + + /** + * Number of milliseconds to wait on the potentially blocking operation. + */ + long timeout() default 3000L; + + /** + * Cache atomicity mode. + */ + CacheAtomicityMode atomicityMode(); + + /** + * Cache mode. + */ + CacheMode cacheMode(); + + /** + * Whether allowing {@link ClusterTopologyCheckedException} as the valid reading result or not. + */ + boolean allowException() default false; + } + + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + + cfg.setConsistentId(igniteInstanceName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(customIpFinder == null ? IP_FINDER : customIpFinder); + + cfg.setCommunicationSpi(new TestRecordingCommunicationSpi()); + + cfg.setDataStorageConfiguration( + new DataStorageConfiguration() + .setDefaultDataRegionConfiguration( + new DataRegionConfiguration() + .setPersistenceEnabled(true) + ) + ); + + cfg.setClientMode(startNodesInClientMode); + + return cfg; + } + + /** {@inheritDoc} */ + @Override public void beforeTest() throws Exception { + stopAllGrids(); + + cleanPersistenceDir(); + + // Checking prerequisites. + assertTrue("Positive timeout is required for the test.", timeout() > 0); + + assertTrue("No baseline servers were requested.", baselineServersCount() > 0); + + int idx = 0; + + // Start baseline nodes. + for (int i = 0; i < baselineServersCount(); i++) + baseline.add(startGrid(idx++)); + + // Activate cluster. + baseline.get(0).cluster().active(true); + + // Start server nodes in activated cluster. + for (int i = 0; i < serversCount(); i++) + srvs.add(startGrid(idx++)); + + // Start client nodes. + startNodesInClientMode(true); + + customIpFinder = new TcpDiscoveryVmIpFinder(false) + .setAddresses( + Collections.singletonList("127.0.0.1:47500") + ); + + for (int i = 0; i < clientsCount(); i++) + clients.add(startGrid(idx++)); + + customIpFinder = null; + } + + /** {@inheritDoc} */ + @Override public void afterTest() throws Exception { + baseline.clear(); + + srvs.clear(); + + clients.clear(); + + grid(0).cluster().active(false); + + stopAllGrids(); + + cleanPersistenceDir(); + } + + + /** + * @throws Exception If failed. + */ + @Params(atomicityMode = ATOMIC, cacheMode = PARTITIONED) + public void testCreateCacheAtomicPartitioned() throws Exception { + testCreateCacheTransactionalReplicated(); + } + + /** + * @throws Exception If failed. + */ + @Params(atomicityMode = ATOMIC, cacheMode = REPLICATED) + public void testCreateCacheAtomicReplicated() throws Exception { + testCreateCacheTransactionalReplicated(); + } + + /** + * @throws Exception If failed. + */ + @Params(atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED) + public void testCreateCacheTransactionalPartitioned() throws Exception { + testCreateCacheTransactionalReplicated(); + } + + /** + * @throws Exception If failed. + */ + @Params(atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED) + public void testCreateCacheTransactionalReplicated() throws Exception { + doTest( + asMessagePredicate(CacheBlockOnReadAbstractTest::createCachePredicate), + () -> baseline.get(0).createCache(UUID.randomUUID().toString()) + ); + } + + /** + * @throws Exception If failed. + */ + @Params(timeout = 5000L, atomicityMode = ATOMIC, cacheMode = PARTITIONED) + public void testDestroyCacheAtomicPartitioned() throws Exception { + testDestroyCacheTransactionalReplicated(); + } + + /** + * @throws Exception If failed. + */ + @Params(timeout = 5000L, atomicityMode = ATOMIC, cacheMode = REPLICATED) + public void testDestroyCacheAtomicReplicated() throws Exception { + testDestroyCacheTransactionalReplicated(); + } + + /** + * @throws Exception If failed. + */ + @Params(timeout = 5000L, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED) + public void testDestroyCacheTransactionalPartitioned() throws Exception { + testDestroyCacheTransactionalReplicated(); + } + + /** + * @throws Exception If failed. + */ + @Params(timeout = 5000L, atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED) + public void testDestroyCacheTransactionalReplicated() throws Exception { + List<String> cacheNames = new ArrayList<>(Arrays.asList( + UUID.randomUUID().toString(), + UUID.randomUUID().toString(), + UUID.randomUUID().toString()) + ); + + for (String cacheName : cacheNames) + baseline.get(0).createCache(cacheName); + + doTest( + asMessagePredicate(CacheBlockOnReadAbstractTest::destroyCachePredicate), + () -> baseline.get(0).destroyCache(cacheNames.remove(0)) + ); + } + /** + * @throws Exception If failed. + */ + public void _testStartClient() throws Exception { + startNodesInClientMode(true); + + doTest( + asMessagePredicate(discoEvt -> discoEvt.type() == EventType.EVT_NODE_JOINED), + () -> { + for (int i = 0; i < baselineServersCount() - 2; i++) + cntFinishedReadOperations.countDown(); + + customIpFinder = new TcpDiscoveryVmIpFinder(false) + .setAddresses( + Collections.singletonList("127.0.0.1:47500") + ); + + startGrid(UUID.randomUUID().toString()); + + customIpFinder = null; + } + ); + } + + /** + * @throws Exception If failed. + */ + public void _testStopClient() throws Exception { + customIpFinder = new TcpDiscoveryVmIpFinder(false) + .setAddresses( + Collections.singletonList("127.0.0.1:47500") + ); + + startNodesInClientMode(true); + + for (int i = 0; i < 3; i++) + clients.add(startGrid(UUID.randomUUID().toString())); + + customIpFinder = null; + + doTest( + asMessagePredicate(discoEvt -> discoEvt.type() == EventType.EVT_NODE_LEFT), + () -> { + for (int i = 0; i < baselineServersCount() - 2; i++) + cntFinishedReadOperations.countDown(); + + stopGrid(clients.remove(clients.size() - 1).name()); + } + ); + } + + /** + * @throws Exception If failed. + */ + @Params(atomicityMode = ATOMIC, cacheMode = PARTITIONED) + public void testStartServerAtomicPartitioned() throws Exception { + testStartServerTransactionalReplicated(); + } + + /** + * @throws Exception If failed. + */ + @Params(atomicityMode = ATOMIC, cacheMode = REPLICATED) + public void testStartServerAtomicReplicated() throws Exception { + testStartServerTransactionalReplicated(); + } + + /** + * @throws Exception If failed. + */ + @Params(atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED) + public void testStartServerTransactionalPartitioned() throws Exception { + testStartServerTransactionalReplicated(); + } + + /** + * @throws Exception If failed. + */ + @Params(atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED) + public void testStartServerTransactionalReplicated() throws Exception { + startNodesInClientMode(false); + + doTest( + asMessagePredicate(discoEvt -> discoEvt.type() == EventType.EVT_NODE_JOINED), + () -> startGrid(UUID.randomUUID().toString()) + ); + } + + /** + * @throws Exception If failed. + */ + @Params(servers = 4, atomicityMode = ATOMIC, cacheMode = PARTITIONED) + public void testStopServerAtomicPartitioned() throws Exception { + testStopServerTransactionalReplicated(); + } + + /** + * @throws Exception If failed. + */ + @Params(servers = 4, atomicityMode = ATOMIC, cacheMode = REPLICATED) + public void testStopServerAtomicReplicated() throws Exception { + testStopServerTransactionalReplicated(); + } + + /** + * @throws Exception If failed. + */ + @Params(servers = 4, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED) + public void testStopServerTransactionalPartitioned() throws Exception { + testStopServerTransactionalReplicated(); + } + + /** + * @throws Exception If failed. + */ + @Params(servers = 4, atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED) + public void testStopServerTransactionalReplicated() throws Exception { + doTest( + asMessagePredicate(discoEvt -> discoEvt.type() == EventType.EVT_NODE_LEFT), + () -> stopGrid(srvs.remove(srvs.size() - 1).name()) + ); + } + + /** + * @throws Exception If failed. + */ + @Params(baseline = 4, atomicityMode = ATOMIC, cacheMode = PARTITIONED) + public void testRestartBaselineAtomicPartitioned() throws Exception { + testRestartBaselineTransactionalReplicated(); + } + + /** + * @throws Exception If failed. + */ + @Params(baseline = 4, atomicityMode = ATOMIC, cacheMode = REPLICATED) + public void testRestartBaselineAtomicReplicated() throws Exception { + testRestartBaselineTransactionalReplicated(); + } + + /** + * @throws Exception If failed. + */ + @Params(baseline = 4, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED) + public void testRestartBaselineTransactionalPartitioned() throws Exception { + testRestartBaselineTransactionalReplicated(); + } + + /** + * @throws Exception If failed. + */ + @Params(baseline = 4, atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED) + public void testRestartBaselineTransactionalReplicated() throws Exception { + doTest( + asMessagePredicate(discoEvt -> discoEvt.type() == EventType.EVT_NODE_JOINED), + () -> { + IgniteEx node = baseline.get(baseline.size() - 1); + + TestRecordingCommunicationSpi.spi(node).stopBlock(); + + stopGrid(node.name()); + + for (int i = 0; i < baselineServersCount() - 2; i++) + cntFinishedReadOperations.countDown(); + + startGrid(node.name()); + } + ); + } + + /** + * @throws Exception If failed. + */ + @Params(timeout = 5000L, atomicityMode = ATOMIC, cacheMode = PARTITIONED) + public void testUpdateBaselineTopologyAtomicPartitioned() throws Exception { + testUpdateBaselineTopologyTransactionalReplicated(); + } + + /** + * @throws Exception If failed. + */ + @Params(timeout = 5000L, atomicityMode = ATOMIC, cacheMode = REPLICATED) + public void testUpdateBaselineTopologyAtomicReplicated() throws Exception { + testUpdateBaselineTopologyTransactionalReplicated(); + } + + /** + * @throws Exception If failed. + */ + @Params(timeout = 5000L, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED) + public void testUpdateBaselineTopologyTransactionalPartitioned() throws Exception { + testUpdateBaselineTopologyTransactionalReplicated(); + } + + /** + * @throws Exception If failed. + */ + @Params(timeout = 5000L, atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED) + public void testUpdateBaselineTopologyTransactionalReplicated() throws Exception { + doTest( + asMessagePredicate(discoEvt -> { + if (discoEvt instanceof DiscoveryCustomEvent) { + DiscoveryCustomEvent discoCustomEvt = (DiscoveryCustomEvent)discoEvt; + + DiscoveryCustomMessage customMsg = discoCustomEvt.customMessage(); + + return customMsg instanceof ChangeGlobalStateMessage; + } + + return false; + }), + () -> { + startNodesInClientMode(false); + + IgniteEx ignite = startGrid(UUID.randomUUID().toString()); + + baseline.get(0).cluster().setBaselineTopology(baseline.get(0).context().discovery().topologyVersion()); + + baseline.add(ignite); + } + ); + } + + /** + * @throws Exception If failed. + */ + @Params(baseline = 9, atomicityMode = ATOMIC, cacheMode = PARTITIONED) + public void testStopBaselineAtomicPartitioned() throws Exception { + testStopBaselineTransactionalReplicated(); + } + + /** + * @throws Exception If failed. + */ + @Params(baseline = 9, atomicityMode = ATOMIC, cacheMode = REPLICATED) + public void testStopBaselineAtomicReplicated() throws Exception { + testStopBaselineTransactionalReplicated(); + } + + /** + * @throws Exception If failed. + */ + @Params(baseline = 9, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED) + public void testStopBaselineTransactionalPartitioned() throws Exception { + testStopBaselineTransactionalReplicated(); + } + + /** + * @throws Exception If failed. + */ + @Params(baseline = 9, atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED) + public void testStopBaselineTransactionalReplicated() throws Exception { + AtomicInteger cntDownCntr = new AtomicInteger(0); + + doTest( + asMessagePredicate(discoEvt -> discoEvt.type() == EventType.EVT_NODE_LEFT), + () -> { + IgniteEx node = baseline.get(baseline.size() - cntDownCntr.get() - 1); + + TestRecordingCommunicationSpi.spi(node).stopBlock(); + + cntDownCntr.incrementAndGet(); + + for (int i = 0; i < cntDownCntr.get(); i++) + cntFinishedReadOperations.countDown(); // This node and previously stopped nodes as well. + + stopGrid(node.name()); + } + ); + } + + /** + * Checks that given discovery event is from "Create cache" operation. + * + * @param discoEvt Discovery event. + */ + private static boolean createCachePredicate(DiscoveryEvent discoEvt) { + if (discoEvt instanceof DiscoveryCustomEvent) { + + DiscoveryCustomEvent discoCustomEvt = (DiscoveryCustomEvent)discoEvt; + + DiscoveryCustomMessage customMsg = discoCustomEvt.customMessage(); + + if (customMsg instanceof DynamicCacheChangeBatch) { + DynamicCacheChangeBatch cacheChangeBatch = (DynamicCacheChangeBatch)customMsg; + + ExchangeActions exchangeActions = U.field(cacheChangeBatch, "exchangeActions"); + + Collection<CacheActionData> startRequests = exchangeActions.cacheStartRequests(); + + return !startRequests.isEmpty(); + } + } + + return false; + } + + /** + * Checks that given discovery event is from "Destroy cache" operation. + * + * @param discoEvt Discovery event. + */ + private static boolean destroyCachePredicate(DiscoveryEvent discoEvt) { + if (discoEvt instanceof DiscoveryCustomEvent) { + + DiscoveryCustomEvent discoCustomEvt = (DiscoveryCustomEvent)discoEvt; + + DiscoveryCustomMessage customMsg = discoCustomEvt.customMessage(); + + if (customMsg instanceof DynamicCacheChangeBatch) { + DynamicCacheChangeBatch cacheChangeBatch = (DynamicCacheChangeBatch)customMsg; + + ExchangeActions exchangeActions = U.field(cacheChangeBatch, "exchangeActions"); + + Collection<CacheActionData> stopRequests = exchangeActions.cacheStopRequests(); + + return !stopRequests.isEmpty(); + } + } + + return false; + } + + /** + * Read operation tat is going to be executed during blocking operation. + */ + @NotNull protected abstract CacheReadBackgroundOperation getReadOperation(); + + /** + * Checks that {@code block} closure doesn't block read operation. + * Does it for client, baseline and regular server node. + * + * @param blockMsgPred Predicate that check whether the message corresponds to the {@code block} or not. + * @param block Blocking operation. + * @throws Exception If failed. + */ + public void doTest(Predicate<Message> blockMsgPred, RunnableX block) throws Exception { + BackgroundOperation backgroundOperation = new BlockMessageOnBaselineBackgroundOperation( + block, + blockMsgPred + ); + + CacheReadBackgroundOperation<?, ?> readOperation = getReadOperation(); + + readOperation.initCache(baseline.get(0), true); + + // Warmup. + if (warmup() > 0) { + try (AutoCloseable read = readOperation.start()) { + Thread.sleep(warmup()); + } + + assertEquals( + readOperation.readOperationsFailed() + " read operations failed during warmup.", + 0, + readOperation.readOperationsFailed() + ); + + assertTrue( + "No read operations were finished during warmup.", + readOperation.readOperationsFinishedUnderBlock() > 0 + ); + } + + + doTest0(clients.get(0), readOperation, backgroundOperation); + + doTest0(srvs.get(0), readOperation, backgroundOperation); + + doTest0(baseline.get(0), readOperation, backgroundOperation); + + + try (AutoCloseable read = readOperation.start()) { + Thread.sleep(500L); + } + + assertEquals( + readOperation.readOperationsFailed() + " read operations failed during finish stage.", + 0, + readOperation.readOperationsFailed() + ); + + assertTrue( + "No read operations were finished during finish stage.", + readOperation.readOperationsFinishedUnderBlock() > 0 + ); + } + + /** + * Internal part for {@link CacheBlockOnReadAbstractTest#doTest(Predicate, RunnableX)}. + * + * @param ignite Ignite instance. Client / baseline / server node. + * @param readOperation Read operation. + * @param backgroundOperation Background operation. + */ + private void doTest0( + IgniteEx ignite, + CacheReadBackgroundOperation<?, ?> readOperation, + BackgroundOperation backgroundOperation + ) throws Exception { + // Reinit internal cache state with given ignite instance. + readOperation.initCache(ignite, false); + + cntFinishedReadOperations = new CountDownLatch(baseline.size() - 1); + + // Read while potentially blocking operation is executing. + try (AutoCloseable block = backgroundOperation.start()) { + cntFinishedReadOperations.await(5 * timeout(), TimeUnit.MILLISECONDS); + + // Possible if test itself is wrong. + assertEquals("Messages weren't blocked in time", 0, cntFinishedReadOperations.getCount()); + + try (AutoCloseable read = readOperation.start()) { + Thread.sleep(timeout()); + } + } + finally { + cntFinishedReadOperations = null; + } + + log.info("Operations finished: " + readOperation.readOperationsFinishedUnderBlock()); + log.info("Longest operation took " + readOperation.maxReadDuration() + "ms"); + + // None of read operations should fail. + assertEquals( + readOperation.readOperationsFailed() + " read operations failed.", + 0, + readOperation.readOperationsFailed() + ); + + assertTrue( + "No read operations were finished during timeout.", + readOperation.readOperationsFinishedUnderBlock() > 0 + ); + + // There were no operations as long as blocking timeout. + assertNotAlmostEqual(timeout(), readOperation.maxReadDuration()); + + // On average every read operation was much faster then blocking timeout. + double avgDuration = (double)timeout() / readOperation.readOperationsFinishedUnderBlock(); + + assertTrue("Avarage duration was too long.",avgDuration < timeout() * 0.25); + } + + /** + * Utility class that allows to start and stop some background operation many times. + */ + protected abstract static class BackgroundOperation { + /** */ + private IgniteInternalFuture<?> fut; + + /** + * Invoked strictly before background thread is started. + */ + protected void init() { + // No-op. + } + + /** + * Operation itself. Will be executed in separate thread. Thread interruption has to be considered as a valid + * way to stop operation. + */ + protected abstract void execute(); + + /** + * @return Allowed time to wait in {@link BackgroundOperation#stop()} method before canceling background thread. + */ + protected abstract long stopTimeout(); + + /** + * Start separate thread and execute method {@link BackgroundOperation#execute()} in it. + * + * @return {@link AutoCloseable} that invokes {@link BackgroundOperation#stop()} on closing. + */ + AutoCloseable start() { + if (fut != null) + throw new UnsupportedOperationException("Only one simultanious operation is allowed"); + + init(); + + CountDownLatch threadStarted = new CountDownLatch(1); + + fut = GridTestUtils.runAsync(() -> { + try { + threadStarted.countDown(); + + execute(); + } + catch (Exception e) { + throw new IgniteException("Unexpected exception in background operation thread", e); + } + }); + + try { + threadStarted.await(); + } + catch (InterruptedException e) { + try { + fut.cancel(); + } + catch (IgniteCheckedException e1) { + e.addSuppressed(e1); + } + + throw new IgniteException(e); + } + + return this::stop; + } + + /** + * Interrupt the operation started in {@link BackgroundOperation#start()} method and join interrupted thread. + */ + void stop() throws Exception { + if (fut == null) + return; + + try { + fut.get(stopTimeout()); + } + catch (IgniteFutureTimeoutCheckedException e) { + fut.cancel(); + + fut.get(); + } + finally { + fut = null; + } + } + } + + /** + * @param discoEvtPred Predicate that tests discovery events. + * @return New predicate that test any message based on {@code discoEvtPred} predicate. + */ + public static Predicate<Message> asMessagePredicate(Predicate<DiscoveryEvent> discoEvtPred) { + return msg -> { + if (msg instanceof GridDhtPartitionsFullMessage) { + GridDhtPartitionsFullMessage fullMsg = (GridDhtPartitionsFullMessage)msg; + + GridDhtPartitionExchangeId exchangeId = fullMsg.exchangeId(); + + if (exchangeId != null) + return discoEvtPred.test(U.field(exchangeId, "discoEvt")); + } + + return false; + }; + } + + /** + * Background operation that executes some node request and doesn't allow its messages to be fully processed until + * operation is stopped. + */ + protected class BlockMessageOnBaselineBackgroundOperation extends BackgroundOperation { + /** */ + private final RunnableX block; + + /** */ + private final Predicate<Message> blockMsg; + + /** + * @param block Blocking operation. + * @param blockMsgPred Predicate that checks whether to block message or not. + * + * @see BlockMessageOnBaselineBackgroundOperation#blockMessage(ClusterNode, Message) + */ + protected BlockMessageOnBaselineBackgroundOperation( + RunnableX block, + Predicate<Message> blockMsgPred + ) { + this.block = block; + blockMsg = blockMsgPred; + } + + /** {@inheritDoc} */ + @Override protected void execute() { + for (IgniteEx server : baseline) { + TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(server); + + spi.blockMessages(this::blockMessage); + } + + block.run(); + } + + /** + * Function to pass into {@link TestRecordingCommunicationSpi#blockMessages(IgniteBiPredicate)}. + * + * @param node Node that receives message. + * @param msg Message. + * @return Whether the given message should be blocked or not. + */ + private boolean blockMessage(ClusterNode node, Message msg) { + boolean block = blockMsg.test(msg) + && baseline.stream().map(IgniteEx::name).anyMatch(node.consistentId()::equals); + + if (block) + cntFinishedReadOperations.countDown(); + + return block; + } + + /** {@inheritDoc} */ + @Override protected long stopTimeout() { + // Should be big enough so thread will stop by it's own. Otherwise test will fail, but that's fine. + return 30_000L; + } + + /** {@inheritDoc} */ + @Override void stop() throws Exception { + for (IgniteEx server : baseline) { + TestRecordingCommunicationSpi spi = TestRecordingCommunicationSpi.spi(server); + + spi.stopBlock(); + } + + super.stop(); + } + } + + + /** + * Runnable that can throw exceptions. + */ + @FunctionalInterface + public interface RunnableX extends Runnable { + /** + * Closure body. + * + * @throws Exception If failed. + */ + void runx() throws Exception; + + /** {@inheritdoc} */ + @Override default void run() { + try { + runx(); + } + catch (Exception e) { + throw new IgniteException(e); + } + } + } + + /** + * {@link BackgroundOperation} implementation for cache reading operations. + */ + protected abstract class ReadBackgroundOperation extends BackgroundOperation { + + /** Counter for successfully finished operations. */ + private final AtomicInteger readOperationsFinishedUnderBlock = new AtomicInteger(); + + /** Counter for failed operations. */ + private final AtomicInteger readOperationsFailed = new AtomicInteger(); + + /** Duration of the longest read operation. */ + private final AtomicLong maxReadDuration = new AtomicLong(-1); + + /** + * Do single iteration of reading operation. Will be executed in a loop. + */ + protected abstract void doRead() throws Exception; + + + /** {@inheritDoc} */ + @Override protected void init() { + readOperationsFinishedUnderBlock.set(0); + + readOperationsFailed.set(0); + + maxReadDuration.set(-1); + } + + /** {@inheritDoc} */ + @Override protected void execute() { + Set<String> loggedMessages = new HashSet<>(); + + while (!Thread.currentThread().isInterrupted()) { + long prevTs = System.currentTimeMillis(); + + try { + doRead(); + + readOperationsFinishedUnderBlock.incrementAndGet(); + } + catch (Exception e) { + boolean threadInterrupted = X.hasCause(e, + InterruptedException.class, + IgniteInterruptedException.class, + IgniteInterruptedCheckedException.class + ); + + if (threadInterrupted) + Thread.currentThread().interrupt(); + else if (allowException() && X.hasCause(e, ClusterTopologyCheckedException.class)) + readOperationsFinishedUnderBlock.incrementAndGet(); + else { + readOperationsFailed.incrementAndGet(); + + if (loggedMessages.add(e.getMessage())) + log.error("Error during read operation execution", e); + + continue; + } + } + + maxReadDuration.set(Math.max(maxReadDuration.get(), System.currentTimeMillis() - prevTs)); + } + } + + /** {@inheritDoc} */ + @Override protected long stopTimeout() { + return 0; + } + + /** + * @return Number of successfully finished operations. + */ + public int readOperationsFinishedUnderBlock() { + return readOperationsFinishedUnderBlock.get(); + } + + /** + * @return Number of failed operations. + */ + public int readOperationsFailed() { + return readOperationsFailed.get(); + } + + /** + * @return Duration of the longest read operation. + */ + public long maxReadDuration() { + return maxReadDuration.get(); + } + } + + /** + * + */ + protected abstract class CacheReadBackgroundOperation<KeyType, ValueType> extends ReadBackgroundOperation { + /** + * {@link CacheReadBackgroundOperation#cache()} method backing field. Updated on each + * {@link CacheReadBackgroundOperation#initCache(IgniteEx, boolean)} invocation. + */ + private IgniteCache<KeyType, ValueType> cache; + + /** + * Reinit internal cache using passed ignite instance and fill it with data if required. + * + * @param ignite Node to get or create cache from. + * @param fillData Whether the cache should be filled with new data or not. + */ + public void initCache(IgniteEx ignite, boolean fillData) { + cache = ignite.getOrCreateCache( + createCacheConfiguration() + .setAtomicityMode(atomicityMode()) + .setCacheMode(cacheMode()) + ); + + if (fillData) { + try (IgniteDataStreamer<KeyType, ValueType> dataStreamer = ignite.dataStreamer(cache.getName())) { + dataStreamer.allowOverwrite(true); + + for (int i = 0; i < entriesCount(); i++) + dataStreamer.addData(createKey(i), createValue(i)); + } + } + } + + /** + * @return Cache configuration. + */ + protected CacheConfiguration<KeyType, ValueType> createCacheConfiguration() { + return new CacheConfiguration<KeyType, ValueType>(DEFAULT_CACHE_NAME) + .setBackups(backupsCount()) + .setAffinity( + new RendezvousAffinityFunction() + .setPartitions(32) + ); + } + + /** + * @return Current cache. + */ + protected final IgniteCache<KeyType, ValueType> cache() { + return cache; + } + + /** + * @return Count of cache entries to create in {@link CacheReadBackgroundOperation#initCache(IgniteEx, boolean)} + * method. + */ + protected int entriesCount() { + return DFLT_CACHE_ENTRIES_CNT; + } + + /** + * @param idx Unique number. + * @return Key to be used for inserting into cache. + * @see CacheReadBackgroundOperation#createValue(int) + */ + protected abstract KeyType createKey(int idx); + + /** + * @param idx Unique number. + * @return Value to be used for inserting into cache. + * @see CacheReadBackgroundOperation#createKey(int) + */ + protected abstract ValueType createValue(int idx); + } + + /** + * {@link CacheReadBackgroundOperation} implementation for (int -> int) cache. Keys and values are equal by default. + */ + protected abstract class IntCacheReadBackgroundOperation + extends CacheReadBackgroundOperation<Integer, Integer> { + /** {@inheritDoc} */ + @Override protected Integer createKey(int idx) { + return idx; + } + + /** {@inheritDoc} */ + @Override protected Integer createValue(int idx) { + return idx; + } + } + + /** + * @return {@link Params} annotation object from the current test method. + */ + protected Params currentTestParams() { + Params params = currentTestAnnotation(Params.class); + + assertNotNull("Test " + getName() + " is not annotated with @Param annotation.", params); + + return params; + } + + /** + * Assert that two numbers are close to each other. + */ + private static void assertAlmostEqual(long exp, long actual) { + assertTrue(String.format("Numbers differ too much [exp=%d, actual=%d]", exp, actual), almostEqual(exp, actual)); + } + + /** + * Assert that two numbers are not close to each other. + */ + private static void assertNotAlmostEqual(long exp, long actual) { + assertFalse(String.format("Numbers are almost equal [exp=%d, actual=%d]", exp, actual), almostEqual(exp, actual)); + } + + /** + * Check that two numbers are close to each other. + */ + private static boolean almostEqual(long exp, long actual) { + double rel = (double)(actual - exp) / exp; + + return Math.abs(rel) < 0.05; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3fae41b1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBlockOnScanTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBlockOnScanTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBlockOnScanTest.java new file mode 100644 index 0000000..2912d05 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBlockOnScanTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import java.util.Objects; +import java.util.Random; +import org.apache.ignite.cache.query.ScanQuery; +import org.jetbrains.annotations.NotNull; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; + +/** + * + */ +public class CacheBlockOnScanTest extends CacheBlockOnReadAbstractTest { + + /** {@inheritDoc} */ + @Override @NotNull protected CacheReadBackgroundOperation<?, ?> getReadOperation() { + return new IntCacheReadBackgroundOperation() { + /** Random. */ + private Random random = new Random(); + + /** {@inheritDoc} */ + @Override public void doRead() { + int idx = random.nextInt(entriesCount()); + + cache().query(new ScanQuery<>((k, v) -> Objects.equals(k, idx))).getAll(); + } + }; + } + + /** {@inheritDoc} */ + @Params(baseline = 9, atomicityMode = ATOMIC, cacheMode = PARTITIONED, allowException = true) + @Override public void testStopBaselineAtomicPartitioned() throws Exception { + super.testStopBaselineAtomicPartitioned(); + } + + /** {@inheritDoc} */ + @Params(baseline = 9, atomicityMode = ATOMIC, cacheMode = REPLICATED, allowException = true) + @Override public void testStopBaselineAtomicReplicated() throws Exception { + super.testStopBaselineAtomicReplicated(); + } + + /** {@inheritDoc} */ + @Params(baseline = 9, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED, allowException = true) + @Override public void testStopBaselineTransactionalPartitioned() throws Exception { + super.testStopBaselineTransactionalPartitioned(); + } + + /** {@inheritDoc} */ + @Params(baseline = 9, atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED, allowException = true) + @Override public void testStopBaselineTransactionalReplicated() throws Exception { + super.testStopBaselineTransactionalReplicated(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3fae41b1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBlockOnSingleGetTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBlockOnSingleGetTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBlockOnSingleGetTest.java new file mode 100644 index 0000000..fc181be --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheBlockOnSingleGetTest.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed; + +import java.util.Random; +import org.jetbrains.annotations.NotNull; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; + +/** + * + */ +public class CacheBlockOnSingleGetTest extends CacheBlockOnReadAbstractTest { + + /** {@inheritDoc} */ + @Override @NotNull protected CacheReadBackgroundOperation<?, ?> getReadOperation() { + return new IntCacheReadBackgroundOperation() { + /** Random. */ + private Random random = new Random(); + + /** {@inheritDoc} */ + @Override public void doRead() { + for (int i = 0; i < 300; i++) + cache().get(random.nextInt(entriesCount())); + } + }; + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = PARTITIONED) + @Override public void testStopBaselineAtomicPartitioned() { + fail("https://issues.apache.org/jira/browse/IGNITE-9915"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = REPLICATED) + @Override public void testStopBaselineAtomicReplicated() { + fail("https://issues.apache.org/jira/browse/IGNITE-9915"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED) + @Override public void testStopBaselineTransactionalPartitioned() { + fail("https://issues.apache.org/jira/browse/IGNITE-9915"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED) + @Override public void testStopBaselineTransactionalReplicated() { + fail("https://issues.apache.org/jira/browse/IGNITE-9915"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = PARTITIONED) + @Override public void testCreateCacheAtomicPartitioned() { + fail("https://issues.apache.org/jira/browse/IGNITE-9883"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = REPLICATED) + @Override public void testCreateCacheAtomicReplicated() { + fail("https://issues.apache.org/jira/browse/IGNITE-9883"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED) + @Override public void testCreateCacheTransactionalPartitioned() { + fail("https://issues.apache.org/jira/browse/IGNITE-9883"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED) + @Override public void testCreateCacheTransactionalReplicated() { + fail("https://issues.apache.org/jira/browse/IGNITE-9883"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = PARTITIONED) + @Override public void testDestroyCacheAtomicPartitioned() { + fail("https://issues.apache.org/jira/browse/IGNITE-9883"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = REPLICATED) + @Override public void testDestroyCacheAtomicReplicated() { + fail("https://issues.apache.org/jira/browse/IGNITE-9883"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED) + @Override public void testDestroyCacheTransactionalPartitioned() { + fail("https://issues.apache.org/jira/browse/IGNITE-9883"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED) + @Override public void testDestroyCacheTransactionalReplicated() { + fail("https://issues.apache.org/jira/browse/IGNITE-9883"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = PARTITIONED) + @Override public void testStartServerAtomicPartitioned() { + fail("https://issues.apache.org/jira/browse/IGNITE-9883"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = REPLICATED) + @Override public void testStartServerAtomicReplicated() { + fail("https://issues.apache.org/jira/browse/IGNITE-9883"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED) + @Override public void testStartServerTransactionalPartitioned() { + fail("https://issues.apache.org/jira/browse/IGNITE-9883"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED) + @Override public void testStartServerTransactionalReplicated() { + fail("https://issues.apache.org/jira/browse/IGNITE-9883"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = PARTITIONED) + @Override public void testStopServerAtomicPartitioned() { + fail("https://issues.apache.org/jira/browse/IGNITE-9883"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = REPLICATED) + @Override public void testStopServerAtomicReplicated() { + fail("https://issues.apache.org/jira/browse/IGNITE-9883"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED) + @Override public void testStopServerTransactionalPartitioned() { + fail("https://issues.apache.org/jira/browse/IGNITE-9883"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED) + @Override public void testStopServerTransactionalReplicated() { + fail("https://issues.apache.org/jira/browse/IGNITE-9883"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = PARTITIONED) + @Override public void testUpdateBaselineTopologyAtomicPartitioned() { + fail("https://issues.apache.org/jira/browse/IGNITE-9883"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = ATOMIC, cacheMode = REPLICATED) + @Override public void testUpdateBaselineTopologyAtomicReplicated() { + fail("https://issues.apache.org/jira/browse/IGNITE-9883"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = PARTITIONED) + @Override public void testUpdateBaselineTopologyTransactionalPartitioned() { + fail("https://issues.apache.org/jira/browse/IGNITE-9883"); + } + + /** {@inheritDoc} */ + @Params(baseline = 1, atomicityMode = TRANSACTIONAL, cacheMode = REPLICATED) + @Override public void testUpdateBaselineTopologyTransactionalReplicated() { + fail("https://issues.apache.org/jira/browse/IGNITE-9883"); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3fae41b1/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java index f1d6682..057087e 100755 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridAbstractTest.java @@ -19,6 +19,7 @@ package org.apache.ignite.testframework.junits; import java.io.ObjectStreamException; import java.io.Serializable; +import java.lang.annotation.Annotation; import java.lang.reflect.Field; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; @@ -118,6 +119,7 @@ import org.apache.log4j.Logger; import org.apache.log4j.PatternLayout; import org.apache.log4j.Priority; import org.apache.log4j.RollingFileAppender; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; @@ -197,6 +199,9 @@ public abstract class GridAbstractTest extends TestCase { /** Number of tests. */ private int testCnt; + /** Lazily initialized current test method. */ + private volatile Method currTestMtd; + /** * */ @@ -670,6 +675,34 @@ public abstract class GridAbstractTest extends TestCase { } /** + * @return Current test method. + * @throws NoSuchMethodError If method wasn't found for some reason. + */ + @NotNull protected Method currentTestMethod() { + if (currTestMtd == null) { + try { + currTestMtd = getClass().getMethod(getName()); + } + catch (NoSuchMethodException e) { + throw new NoSuchMethodError("Current test method is not found: " + getName()); + } + } + + return currTestMtd; + } + + /** + * Search for the annotation of the given type in current test method. + * + * @param annotationCls Type of annotation to look for. + * @param <A> Annotation type. + * @return Instance of annotation if it is present in test method. + */ + @Nullable protected <A extends Annotation> A currentTestAnnotation(Class<A> annotationCls) { + return currentTestMethod().getAnnotation(annotationCls); + } + + /** * @return Started grid. * @throws Exception If anything failed. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/3fae41b1/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheBlockExchangeOnReadOperationsTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheBlockExchangeOnReadOperationsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheBlockExchangeOnReadOperationsTestSuite.java new file mode 100755 index 0000000..79ec18b --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheBlockExchangeOnReadOperationsTestSuite.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.testsuites; + +import java.util.Set; +import junit.framework.TestSuite; +import org.apache.ignite.internal.processors.cache.distributed.CacheBlockOnGetAllTest; +import org.apache.ignite.internal.processors.cache.distributed.CacheBlockOnScanTest; +import org.apache.ignite.internal.processors.cache.distributed.CacheBlockOnSingleGetTest; + +/** + * Test suite. + */ +public class IgniteCacheBlockExchangeOnReadOperationsTestSuite extends TestSuite { + /** + * @return IgniteCache test suite. + * @throws Exception Thrown in case of the failure. + */ + public static TestSuite suite() throws Exception { + return suite(null); + } + + /** + * @param ignoredTests Tests to ignore. + * @return Test suite. + */ + public static TestSuite suite(Set<Class> ignoredTests) { + TestSuite suite = new TestSuite("Do Not Block Read Operations Test Suite"); + + suite.addTestSuite(CacheBlockOnSingleGetTest.class); + suite.addTestSuite(CacheBlockOnGetAllTest.class); + suite.addTestSuite(CacheBlockOnScanTest.class); + + return suite; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3fae41b1/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java index 6c48ecc..d0734a8 100755 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite7.java @@ -33,9 +33,9 @@ import org.apache.ignite.internal.processors.cache.WalModeChangeAdvancedSelfTest import org.apache.ignite.internal.processors.cache.WalModeChangeCoordinatorNotAffinityNodeSelfTest; import org.apache.ignite.internal.processors.cache.WalModeChangeSelfTest; import org.apache.ignite.internal.processors.cache.distributed.Cache64kPartitionsTest; +import org.apache.ignite.internal.processors.cache.distributed.CacheDataLossOnPartitionMoveTest; import org.apache.ignite.internal.processors.cache.distributed.CachePageWriteLockUnlockTest; import org.apache.ignite.internal.processors.cache.distributed.CacheRentingStateRepairTest; -import org.apache.ignite.internal.processors.cache.distributed.CacheDataLossOnPartitionMoveTest; import org.apache.ignite.internal.processors.cache.distributed.dht.IgniteCacheStartWithLoadTest; import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingPartitionCountersTest; import org.apache.ignite.internal.processors.cache.distributed.rebalancing.GridCacheRebalancingWithAsyncClearingTest;
