http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java new file mode 100644 index 0000000..d2a2f49 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractConcurrentSelfTest.java @@ -0,0 +1,921 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.index; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.Ignition; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.QueryIndex; +import org.apache.ignite.cache.query.SqlQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.processors.query.GridQueryProcessor; +import org.apache.ignite.internal.processors.query.QueryIndexDescriptorImpl; +import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing; +import org.apache.ignite.internal.processors.query.schema.SchemaIndexCacheVisitor; +import org.apache.ignite.internal.processors.query.schema.SchemaOperationException; +import org.apache.ignite.internal.util.typedef.T2; +import org.jetbrains.annotations.Nullable; + +import javax.cache.Cache; +import java.util.*; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Concurrency tests for dynamic index create/drop. + */ +@SuppressWarnings("unchecked") +public abstract class DynamicIndexAbstractConcurrentSelfTest extends DynamicIndexAbstractSelfTest { + /** Test duration. */ + private static final long TEST_DUR = 10_000L; + + /** Large cache size. */ + private static final int LARGE_CACHE_SIZE = 100_000; + + /** Latches to block certain index operations. */ + private static final ConcurrentHashMap<UUID, T2<CountDownLatch, AtomicBoolean>> BLOCKS = new ConcurrentHashMap<>(); + + /** Cache mode. */ + private final CacheMode cacheMode; + + /** Atomicity mode. */ + private final CacheAtomicityMode atomicityMode; + + /** + * Constructor. + * + * @param cacheMode Cache mode. + * @param atomicityMode Atomicity mode. + */ + protected DynamicIndexAbstractConcurrentSelfTest(CacheMode cacheMode, CacheAtomicityMode atomicityMode) { + this.cacheMode = cacheMode; + this.atomicityMode = atomicityMode; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + GridQueryProcessor.idxCls = BlockingIndexing.class; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + GridQueryProcessor.idxCls = null; + + for (T2<CountDownLatch, AtomicBoolean> block : BLOCKS.values()) + block.get1().countDown(); + + BLOCKS.clear(); + + stopAllGrids(); + + super.afterTest(); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return 5 * 60 * 1000L; + } + + /** {@inheritDoc} */ + @Override protected CacheConfiguration<KeyClass, ValueClass> cacheConfiguration() { + CacheConfiguration<KeyClass, ValueClass> ccfg = super.cacheConfiguration(); + + return ccfg.setCacheMode(cacheMode).setAtomicityMode(atomicityMode); + } + + /** + * Make sure that coordinator migrates correctly between nodes. + * + * @throws Exception If failed. + */ + public void testCoordinatorChange() throws Exception { + // Start servers. + Ignite srv1 = Ignition.start(serverConfiguration(1)); + Ignite srv2 = Ignition.start(serverConfiguration(2)); + Ignition.start(serverConfiguration(3, true)); + Ignition.start(serverConfiguration(4)); + + UUID srv1Id = srv1.cluster().localNode().id(); + UUID srv2Id = srv2.cluster().localNode().id(); + + // Start client which will execute operations. + Ignite cli = Ignition.start(clientConfiguration(5)); + + cli.getOrCreateCache(cacheConfiguration()); + + put(srv1, 0, KEY_AFTER); + + // Test migration between normal servers. + blockIndexing(srv1Id); + + QueryIndex idx1 = index(IDX_NAME_1, field(FIELD_NAME_1)); + + IgniteInternalFuture<?> idxFut1 = queryProcessor(cli).dynamicIndexCreate(CACHE_NAME, TBL_NAME, idx1, false); + + Thread.sleep(100); + + //srv1.close(); + Ignition.stop(srv1.name(), true); + + unblockIndexing(srv1Id); + + idxFut1.get(); + + assertIndex(CACHE_NAME, TBL_NAME, IDX_NAME_1, field(FIELD_NAME_1)); + assertIndexUsed(IDX_NAME_1, SQL_SIMPLE_FIELD_1, SQL_ARG_1); + assertSqlSimpleData(SQL_SIMPLE_FIELD_1, KEY_AFTER - SQL_ARG_1); + + // Test migration from normal server to non-affinity server. + blockIndexing(srv2Id); + + QueryIndex idx2 = index(IDX_NAME_2, field(alias(FIELD_NAME_2))); + + IgniteInternalFuture<?> idxFut2 = queryProcessor(cli).dynamicIndexCreate(CACHE_NAME, TBL_NAME, idx2, false); + + Thread.sleep(100); + + //srv2.close(); + Ignition.stop(srv2.name(), true); + + unblockIndexing(srv2Id); + + idxFut2.get(); + + assertIndex(CACHE_NAME, TBL_NAME, IDX_NAME_2, field(alias(FIELD_NAME_2))); + assertIndexUsed(IDX_NAME_2, SQL_SIMPLE_FIELD_2, SQL_ARG_1); + assertSqlSimpleData(SQL_SIMPLE_FIELD_2, KEY_AFTER - SQL_ARG_1); + } + + /** + * Test operations join. + * + * @throws Exception If failed. + */ + public void testOperationChaining() throws Exception { + Ignite srv1 = Ignition.start(serverConfiguration(1)); + + Ignition.start(serverConfiguration(2)); + Ignition.start(serverConfiguration(3, true)); + Ignition.start(clientConfiguration(4)); + + srv1.getOrCreateCache(cacheConfiguration()); + + blockIndexing(srv1); + + QueryIndex idx1 = index(IDX_NAME_1, field(FIELD_NAME_1)); + QueryIndex idx2 = index(IDX_NAME_2, field(alias(FIELD_NAME_2))); + + IgniteInternalFuture<?> idxFut1 = queryProcessor(srv1).dynamicIndexCreate(CACHE_NAME, TBL_NAME, idx1, false); + IgniteInternalFuture<?> idxFut2 = queryProcessor(srv1).dynamicIndexCreate(CACHE_NAME, TBL_NAME, idx2, false); + + // Start even more nodes of different flavors + Ignition.start(serverConfiguration(5)); + Ignition.start(serverConfiguration(6, true)); + Ignition.start(clientConfiguration(7)); + + assert !idxFut1.isDone(); + assert !idxFut2.isDone(); + + unblockIndexing(srv1); + + idxFut1.get(); + idxFut2.get(); + + assertIndex(CACHE_NAME, TBL_NAME, IDX_NAME_1, field(FIELD_NAME_1)); + assertIndex(CACHE_NAME, TBL_NAME, IDX_NAME_2, field(alias(FIELD_NAME_2))); + + Thread.sleep(100); + + put(srv1, 0, KEY_AFTER); + + assertIndexUsed(IDX_NAME_1, SQL_SIMPLE_FIELD_1, SQL_ARG_1); + assertIndexUsed(IDX_NAME_2, SQL_SIMPLE_FIELD_2, SQL_ARG_1); + + assertSqlSimpleData(SQL_SIMPLE_FIELD_1, KEY_AFTER - SQL_ARG_1); + assertSqlSimpleData(SQL_SIMPLE_FIELD_2, KEY_AFTER - SQL_ARG_1); + } + + /** + * Test node join on pending operation. + * + * @throws Exception If failed. + */ + public void testNodeJoinOnPendingOperation() throws Exception { + Ignite srv1 = Ignition.start(serverConfiguration(1)); + + srv1.getOrCreateCache(cacheConfiguration()); + + blockIndexing(srv1); + + QueryIndex idx = index(IDX_NAME_1, field(FIELD_NAME_1)); + + IgniteInternalFuture<?> idxFut = queryProcessor(srv1).dynamicIndexCreate(CACHE_NAME, TBL_NAME, idx, false); + + Ignition.start(serverConfiguration(2)); + Ignition.start(serverConfiguration(3, true)); + Ignition.start(clientConfiguration(4)); + + assert !idxFut.isDone(); + + unblockIndexing(srv1); + + idxFut.get(); + + Thread.sleep(100L); + + assertIndex(CACHE_NAME, TBL_NAME, IDX_NAME_1, field(FIELD_NAME_1)); + + put(srv1, 0, KEY_AFTER); + + assertIndexUsed(IDX_NAME_1, SQL_SIMPLE_FIELD_1, SQL_ARG_1); + assertSqlSimpleData(SQL_SIMPLE_FIELD_1, KEY_AFTER - SQL_ARG_1); + } + + /** + * PUT/REMOVE data from cache and build index concurrently. + * + * @throws Exception If failed, + */ + public void testConcurrentPutRemove() throws Exception { + // Start several nodes. + Ignite srv1 = Ignition.start(serverConfiguration(1)); + Ignition.start(serverConfiguration(2)); + Ignition.start(serverConfiguration(3)); + Ignition.start(serverConfiguration(4)); + + awaitPartitionMapExchange(); + + IgniteCache<BinaryObject, BinaryObject> cache = srv1.createCache(cacheConfiguration()).withKeepBinary(); + + // Start data change operations from several threads. + final AtomicBoolean stopped = new AtomicBoolean(); + + IgniteInternalFuture updateFut = multithreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + while (!stopped.get()) { + Ignite node = grid(ThreadLocalRandom.current().nextInt(1, 5)); + + int key = ThreadLocalRandom.current().nextInt(0, LARGE_CACHE_SIZE); + int val = ThreadLocalRandom.current().nextInt(); + + BinaryObject keyObj = key(node, key); + + if (ThreadLocalRandom.current().nextBoolean()) { + BinaryObject valObj = value(node, val); + + node.cache(CACHE_NAME).put(keyObj, valObj); + } + else + node.cache(CACHE_NAME).remove(keyObj); + } + + return null; + } + }, 4); + + // Let some to arrive. + Thread.sleep(500L); + + // Create index. + QueryIndex idx = index(IDX_NAME_1, field(FIELD_NAME_1)); + + queryProcessor(srv1).dynamicIndexCreate(CACHE_NAME, TBL_NAME, idx, false).get(); + + // Stop updates once index is ready. + stopped.set(true); + + updateFut.get(); + + // Make sure index is there. + assertIndex(CACHE_NAME, TBL_NAME, IDX_NAME_1, field(FIELD_NAME_1)); + assertIndexUsed(IDX_NAME_1, SQL_SIMPLE_FIELD_1, SQL_ARG_1); + + // Get expected values. + Map<Long, Long> expKeys = new HashMap<>(); + + for (int i = 0; i < LARGE_CACHE_SIZE; i++) { + BinaryObject val = cache.get(key(srv1, i)); + + if (val != null) { + long fieldVal = val.field(FIELD_NAME_1); + + if (fieldVal >= SQL_ARG_1) + expKeys.put((long)i, fieldVal); + } + } + + // Validate query result. + for (Ignite node : Ignition.allGrids()) { + IgniteCache<BinaryObject, BinaryObject> nodeCache = node.cache(CACHE_NAME).withKeepBinary(); + + SqlQuery qry = new SqlQuery(tableName(ValueClass.class), SQL_SIMPLE_FIELD_1).setArgs(SQL_ARG_1); + + List<Cache.Entry<BinaryObject, BinaryObject>> res = nodeCache.query(qry).getAll(); + + assertEquals("Cache size mismatch [exp=" + expKeys.size() + ", actual=" + res.size() + ']', + expKeys.size(), res.size()); + + for (Cache.Entry<BinaryObject, BinaryObject> entry : res) { + long key = entry.getKey().field(FIELD_KEY); + Long fieldVal = entry.getValue().field(FIELD_NAME_1); + + assertTrue("Expected key is not in result set: " + key, expKeys.containsKey(key)); + + assertEquals("Unexpected value [key=" + key + ", expVal=" + expKeys.get(key) + + ", actualVal=" + fieldVal + ']', expKeys.get(key), fieldVal); + } + + } + } + + /** + * Test index consistency on re-balance. + * + * @throws Exception If failed. + */ + public void testConcurrentRebalance() throws Exception { + // Start cache and populate it with data. + Ignite srv1 = Ignition.start(serverConfiguration(1)); + Ignite srv2 = Ignition.start(serverConfiguration(2)); + + srv1.createCache(cacheConfiguration()); + + awaitPartitionMapExchange(); + + put(srv1, 0, LARGE_CACHE_SIZE); + + // Start index operation in blocked state. + blockIndexing(srv1); + blockIndexing(srv2); + + QueryIndex idx = index(IDX_NAME_1, field(FIELD_NAME_1)); + + final IgniteInternalFuture<?> idxFut = + queryProcessor(srv1).dynamicIndexCreate(CACHE_NAME, TBL_NAME, idx, false); + + Thread.sleep(100); + + // Start two more nodes and unblock index operation in the middle. + Ignition.start(serverConfiguration(3)); + + unblockIndexing(srv1); + unblockIndexing(srv2); + + Ignition.start(serverConfiguration(4)); + + awaitPartitionMapExchange(); + + // Validate index state. + idxFut.get(); + + assertIndex(CACHE_NAME, TBL_NAME, IDX_NAME_1, field(FIELD_NAME_1)); + + assertIndexUsed(IDX_NAME_1, SQL_SIMPLE_FIELD_1, SQL_ARG_1); + assertSqlSimpleData(SQL_SIMPLE_FIELD_1, LARGE_CACHE_SIZE - SQL_ARG_1); + } + + /** + * Check what happen in case cache is destroyed before operation is started. + * + * @throws Exception If failed. + */ + public void testConcurrentCacheDestroy() throws Exception { + // Start complex topology. + Ignite srv1 = Ignition.start(serverConfiguration(1)); + + Ignition.start(serverConfiguration(2)); + Ignition.start(serverConfiguration(3, true)); + + Ignite cli = Ignition.start(clientConfiguration(4)); + + // Start cache and populate it with data. + IgniteCache cache = cli.getOrCreateCache(cacheConfiguration()); + + put(cli, KEY_AFTER); + + // Start index operation and block it on coordinator. + blockIndexing(srv1); + + QueryIndex idx = index(IDX_NAME_1, field(FIELD_NAME_1)); + + final IgniteInternalFuture<?> idxFut = + queryProcessor(srv1).dynamicIndexCreate(CACHE_NAME, TBL_NAME, idx, false); + + Thread.sleep(100); + + // Destroy cache. + cache.destroy(); + + // Unblock indexing and see what happens. + unblockIndexing(srv1); + + try { + idxFut.get(); + + fail("Exception has not been thrown."); + } + catch (SchemaOperationException e) { + // No-op. + } + } + + /** + * Make sure that contended operations on the same index from different nodes do not hang. + * + * @throws Exception If failed. + */ + public void testConcurrentOperationsMultithreaded() throws Exception { + // Start complex topology. + Ignition.start(serverConfiguration(1)); + Ignition.start(serverConfiguration(2)); + Ignition.start(serverConfiguration(3, true)); + + Ignite cli = Ignition.start(clientConfiguration(4)); + + cli.createCache(cacheConfiguration()); + + final AtomicBoolean stopped = new AtomicBoolean(); + + // Start several threads which will mess around indexes. + final QueryIndex idx = index(IDX_NAME_1, field(FIELD_NAME_1)); + + IgniteInternalFuture idxFut = multithreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + boolean exists = false; + + while (!stopped.get()) { + Ignite node = grid(ThreadLocalRandom.current().nextInt(1, 5)); + + IgniteInternalFuture fut; + + if (exists) { + fut = queryProcessor(node).dynamicIndexDrop(CACHE_NAME, IDX_NAME_1, true); + + exists = false; + } + else { + fut = queryProcessor(node).dynamicIndexCreate(CACHE_NAME, TBL_NAME, idx, true); + + exists = true; + } + + try { + fut.get(); + } + catch (SchemaOperationException e) { + // No-op. + } + catch (Exception e) { + fail("Unexpected exception: " + e); + } + } + + return null; + } + }, 8); + + Thread.sleep(TEST_DUR); + + stopped.set(true); + + // Make sure nothing hanged. + idxFut.get(); + + queryProcessor(cli).dynamicIndexDrop(CACHE_NAME, IDX_NAME_1, true).get(); + queryProcessor(cli).dynamicIndexCreate(CACHE_NAME, TBL_NAME, idx, true).get(); + + assertIndex(CACHE_NAME, TBL_NAME, IDX_NAME_1, field(FIELD_NAME_1)); + + put(cli, 0, KEY_AFTER); + + assertIndexUsed(IDX_NAME_1, SQL_SIMPLE_FIELD_1, SQL_ARG_1); + assertSqlSimpleData(SQL_SIMPLE_FIELD_1, KEY_AFTER - SQL_ARG_1); + } + + /** + * Make sure that contended operations on the same index from different nodes do not hang when we issue both + * CREATE/DROP and SELECT statements. + * + * @throws Exception If failed. + */ + public void testQueryConsistencyMultithreaded() throws Exception { + // Start complex topology. + Ignition.start(serverConfiguration(1)); + Ignition.start(serverConfiguration(2)); + Ignition.start(serverConfiguration(3, true)); + + Ignite cli = Ignition.start(clientConfiguration(4)); + + cli.createCache(cacheConfiguration()); + + put(cli, 0, KEY_AFTER); + + final AtomicBoolean stopped = new AtomicBoolean(); + + // Thread which will mess around indexes. + final QueryIndex idx = index(IDX_NAME_1, field(FIELD_NAME_1)); + + IgniteInternalFuture idxFut = multithreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + boolean exists = false; + + while (!stopped.get()) { + Ignite node = grid(ThreadLocalRandom.current().nextInt(1, 5)); + + IgniteInternalFuture fut; + + if (exists) { + fut = queryProcessor(node).dynamicIndexDrop(CACHE_NAME, IDX_NAME_1, true); + + exists = false; + } + else { + fut = queryProcessor(node).dynamicIndexCreate(CACHE_NAME, TBL_NAME, idx, true); + + exists = true; + } + + try { + fut.get(); + } + catch (SchemaOperationException e) { + // No-op. + } + catch (Exception e) { + fail("Unexpected exception: " + e); + } + } + + return null; + } + }, 1); + + IgniteInternalFuture qryFut = multithreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + while (!stopped.get()) { + Ignite node = grid(ThreadLocalRandom.current().nextInt(1, 5)); + + assertSqlSimpleData(node, SQL_SIMPLE_FIELD_1, KEY_AFTER - SQL_ARG_1); + } + + return null; + } + }, 8); + + Thread.sleep(TEST_DUR); + + stopped.set(true); + + // Make sure nothing hanged. + idxFut.get(); + qryFut.get(); + } + + /** + * Test concurrent node start/stop along with index operations. Nothing should hang. + * + * @throws Exception If failed. + */ + public void testConcurrentOperationsAndNodeStartStopMultithreaded() throws Exception { + // Start several stable nodes. + Ignition.start(serverConfiguration(1)); + Ignition.start(serverConfiguration(2)); + Ignition.start(serverConfiguration(3, true)); + + final Ignite cli = Ignition.start(clientConfiguration(4)); + + cli.createCache(cacheConfiguration()); + + final AtomicBoolean stopped = new AtomicBoolean(); + + // Start node start/stop worker. + final AtomicInteger nodeIdx = new AtomicInteger(4); + + IgniteInternalFuture startStopFut = multithreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + boolean exists = false; + + int lastIdx = 0; + + while (!stopped.get()) { + if (exists) { + stopGrid(lastIdx); + + exists = false; + } + else { + lastIdx = nodeIdx.incrementAndGet(); + + IgniteConfiguration cfg; + + switch (ThreadLocalRandom.current().nextInt(0, 3)) { + case 1: + cfg = serverConfiguration(lastIdx, false); + + break; + + case 2: + + cfg = serverConfiguration(lastIdx, true); + + break; + + default: + cfg = clientConfiguration(lastIdx); + } + + Ignition.start(cfg); + + exists = true; + } + + Thread.sleep(ThreadLocalRandom.current().nextLong(500L, 1500L)); + } + + return null; + } + }, 1); + + // Start several threads which will mess around indexes. + final QueryIndex idx = index(IDX_NAME_1, field(FIELD_NAME_1)); + + IgniteInternalFuture idxFut = multithreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + boolean exists = false; + + while (!stopped.get()) { + Ignite node = grid(ThreadLocalRandom.current().nextInt(1, 5)); + + IgniteInternalFuture fut; + + if (exists) { + fut = queryProcessor(node).dynamicIndexDrop(CACHE_NAME, IDX_NAME_1, true); + + exists = false; + } + else { + fut = queryProcessor(node).dynamicIndexCreate(CACHE_NAME, TBL_NAME, idx, true); + + exists = true; + } + + try { + fut.get(); + } + catch (SchemaOperationException e) { + // No-op. + } + catch (Exception e) { + fail("Unexpected exception: " + e); + } + } + + return null; + } + }, 1); + + Thread.sleep(TEST_DUR); + + stopped.set(true); + + // Make sure nothing hanged. + startStopFut.get(); + idxFut.get(); + + // Make sure cache is operational at this point. + cli.getOrCreateCache(cacheConfiguration()); + + queryProcessor(cli).dynamicIndexDrop(CACHE_NAME, IDX_NAME_1, true).get(); + queryProcessor(cli).dynamicIndexCreate(CACHE_NAME, TBL_NAME, idx, true).get(); + + assertIndex(CACHE_NAME, TBL_NAME, IDX_NAME_1, field(FIELD_NAME_1)); + + put(cli, 0, KEY_AFTER); + + assertIndexUsed(IDX_NAME_1, SQL_SIMPLE_FIELD_1, SQL_ARG_1); + assertSqlSimpleData(SQL_SIMPLE_FIELD_1, KEY_AFTER - SQL_ARG_1); + } + + /** + * Multithreaded cache start/stop along with index operations. Nothing should hang. + * + * @throws Exception If failed. + */ + public void testConcurrentOperationsAndCacheStartStopMultithreaded() throws Exception { + // Start complex topology. + Ignition.start(serverConfiguration(1)); + Ignition.start(serverConfiguration(2)); + Ignition.start(serverConfiguration(3, true)); + + Ignite cli = Ignition.start(clientConfiguration(4)); + + final AtomicBoolean stopped = new AtomicBoolean(); + + // Start cache create/destroy worker. + IgniteInternalFuture startStopFut = multithreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + boolean exists = false; + + while (!stopped.get()) { + Ignite node = grid(ThreadLocalRandom.current().nextInt(1, 5)); + + if (exists) { + node.destroyCache(CACHE_NAME); + + exists = false; + } + else { + node.createCache(cacheConfiguration()); + + exists = true; + } + + Thread.sleep(ThreadLocalRandom.current().nextLong(200L, 400L)); + } + + return null; + } + }, 1); + + // Start several threads which will mess around indexes. + final QueryIndex idx = index(IDX_NAME_1, field(FIELD_NAME_1)); + + IgniteInternalFuture idxFut = multithreadedAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + boolean exists = false; + + while (!stopped.get()) { + Ignite node = grid(ThreadLocalRandom.current().nextInt(1, 5)); + + IgniteInternalFuture fut; + + if (exists) { + fut = queryProcessor(node).dynamicIndexDrop(CACHE_NAME, IDX_NAME_1, true); + + exists = false; + } + else { + fut = queryProcessor(node).dynamicIndexCreate(CACHE_NAME, TBL_NAME, idx, true); + + exists = true; + } + + try { + fut.get(); + } + catch (SchemaOperationException e) { + // No-op. + } + catch (Exception e) { + fail("Unexpected exception: " + e); + } + } + + return null; + } + }, 8); + + Thread.sleep(TEST_DUR); + + stopped.set(true); + + // Make sure nothing hanged. + startStopFut.get(); + idxFut.get(); + + // Make sure cache is operational at this point. + cli.getOrCreateCache(cacheConfiguration()); + + queryProcessor(cli).dynamicIndexDrop(CACHE_NAME, IDX_NAME_1, true).get(); + queryProcessor(cli).dynamicIndexCreate(CACHE_NAME, TBL_NAME, idx, true).get(); + + assertIndex(CACHE_NAME, TBL_NAME, IDX_NAME_1, field(FIELD_NAME_1)); + + put(cli, 0, KEY_AFTER); + + assertIndexUsed(IDX_NAME_1, SQL_SIMPLE_FIELD_1, SQL_ARG_1); + assertSqlSimpleData(SQL_SIMPLE_FIELD_1, KEY_AFTER - SQL_ARG_1); + } + + /** + * Block indexing. + * + * @param node Node. + */ + @SuppressWarnings("SuspiciousMethodCalls") + private static void blockIndexing(Ignite node) { + UUID nodeId = ((IgniteEx)node).localNode().id(); + + blockIndexing(nodeId); + } + + /** + * Block indexing. + * + * @param nodeId Node. + */ + @SuppressWarnings("SuspiciousMethodCalls") + private static void blockIndexing(UUID nodeId) { + assertFalse(BLOCKS.contains(nodeId)); + + BLOCKS.put(nodeId, new T2<>(new CountDownLatch(1), new AtomicBoolean())); + } + + /** + * Unblock indexing. + * + * @param node Node. + */ + private static void unblockIndexing(Ignite node) { + UUID nodeId = ((IgniteEx)node).localNode().id(); + + unblockIndexing(nodeId); + } + + /** + * Unblock indexing. + * + * @param nodeId Node ID. + */ + private static void unblockIndexing(UUID nodeId) { + T2<CountDownLatch, AtomicBoolean> blocker = BLOCKS.remove(nodeId); + + assertNotNull(blocker); + + blocker.get1().countDown(); + } + + /** + * Await indexing. + * + * @param nodeId Node ID. + */ + private static void awaitIndexing(UUID nodeId) { + T2<CountDownLatch, AtomicBoolean> blocker = BLOCKS.get(nodeId); + + if (blocker != null) { + assertTrue(blocker.get2().compareAndSet(false, true)); + + while (true) { + try { + blocker.get1().await(); + + break; + } + catch (InterruptedException e) { + // No-op. + } + } + } + } + + /** + * Blocking indexing processor. + */ + private static class BlockingIndexing extends IgniteH2Indexing { + /** {@inheritDoc} */ + @Override public void dynamicIndexCreate(@Nullable String spaceName, String tblName, + QueryIndexDescriptorImpl idxDesc, boolean ifNotExists, SchemaIndexCacheVisitor cacheVisitor) + throws IgniteCheckedException { + awaitIndexing(ctx.localNodeId()); + + super.dynamicIndexCreate(spaceName, tblName, idxDesc, ifNotExists, cacheVisitor); + } + + /** {@inheritDoc} */ + @Override public void dynamicIndexDrop(@Nullable String spaceName, String idxName, boolean ifExists) + throws IgniteCheckedException{ + awaitIndexing(ctx.localNodeId()); + + super.dynamicIndexDrop(spaceName, idxName, ifExists); + } + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractSelfTest.java new file mode 100644 index 0000000..e52e0d3 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexAbstractSelfTest.java @@ -0,0 +1,467 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.index; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteDataStreamer; +import org.apache.ignite.Ignition; +import org.apache.ignite.binary.BinaryObject; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.cache.query.SqlQuery; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.binary.BinaryMarshaller; +import org.apache.ignite.internal.processors.query.schema.SchemaOperationException; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; + +import javax.cache.Cache; +import java.io.Serializable; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.UUID; + +/** + * Tests for dynamic index creation. + */ +@SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"}) +public abstract class DynamicIndexAbstractSelfTest extends AbstractSchemaSelfTest { + /** Attribute to filter node out of cache data nodes. */ + protected static final String ATTR_FILTERED = "FILTERED"; + + /** Key range limit for "before" step. */ + protected static final int KEY_BEFORE = 100; + + /** Key range limit for "after" step. */ + protected static final int KEY_AFTER = 200; + + /** SQL to check index on the field 1. */ + protected static final String SQL_SIMPLE_FIELD_1 = "SELECT * FROM " + TBL_NAME + " WHERE " + FIELD_NAME_1 + " >= ?"; + + /** SQL to check composite index */ + protected static final String SQL_COMPOSITE = "SELECT * FROM " + TBL_NAME + " WHERE " + FIELD_NAME_1 + + " >= ? AND " + alias(FIELD_NAME_2) + " >= ?"; + + /** SQL to check index on the field 2. */ + protected static final String SQL_SIMPLE_FIELD_2 = + "SELECT * FROM " + TBL_NAME + " WHERE " + alias(FIELD_NAME_2) + " >= ?"; + + /** Argument for simple SQL (1). */ + protected static final int SQL_ARG_1 = 40; + + /** Argument for simple SQL (2). */ + protected static final int SQL_ARG_2 = 80; + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** + * Create server configuration. + * + * @param idx Index. + * @return Configuration. + * @throws Exception If failed. + */ + protected IgniteConfiguration serverConfiguration(int idx) throws Exception { + return serverConfiguration(idx, false); + } + + /** + * Create server configuration. + * + * @param idx Index. + * @param filter Whether to filter the node out of cache. + * @return Configuration. + * @throws Exception If failed. + */ + protected IgniteConfiguration serverConfiguration(int idx, boolean filter) throws Exception { + IgniteConfiguration cfg = commonConfiguration(idx); + + if (filter) + cfg.setUserAttributes(Collections.singletonMap(ATTR_FILTERED, true)); + + return cfg; + } + + /** + * Create client configuration. + * + * @param idx Index. + * @return Configuration. + * @throws Exception If failed. + */ + protected IgniteConfiguration clientConfiguration(int idx) throws Exception { + return commonConfiguration(idx).setClientMode(true); + } + + /** + * Create common node configuration. + * + * @param idx Index. + * @return Configuration. + * @throws Exception If failed. + */ + protected IgniteConfiguration commonConfiguration(int idx) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(getTestIgniteInstanceName(idx)); + + cfg.setDiscoverySpi(new TcpDiscoverySpi()); + + cfg.setMarshaller(new BinaryMarshaller()); + + return optimize(cfg); + } + + /** + * @return Default cache configuration. + */ + protected CacheConfiguration<KeyClass, ValueClass> cacheConfiguration() { + CacheConfiguration ccfg = new CacheConfiguration().setName(CACHE_NAME); + + QueryEntity entity = new QueryEntity(); + + entity.setKeyType(KeyClass.class.getName()); + entity.setValueType(ValueClass.class.getName()); + + entity.addQueryField(FIELD_KEY, Long.class.getName(), null); + entity.addQueryField(FIELD_NAME_1, Long.class.getName(), null); + entity.addQueryField(FIELD_NAME_2, Long.class.getName(), null); + + entity.setKeyFields(Collections.singleton(FIELD_KEY)); + + entity.setAliases(Collections.singletonMap(FIELD_NAME_2, alias(FIELD_NAME_2))); + + ccfg.setQueryEntities(Collections.singletonList(entity)); + + ccfg.setNodeFilter(new NodeFilter()); + + ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + ccfg.setBackups(1); + + return ccfg; + } + + /** + * Ensure that schema exception is thrown. + * + * @param r Runnable. + * @param expCode Error code. + */ + protected static void assertSchemaException(RunnableX r, int expCode) { + try { + r.run(); + } + catch (SchemaOperationException e) { + assertEquals("Unexpected error code [expected=" + expCode + ", actual=" + e.code() + ']', + expCode, e.code()); + + return; + } + catch (Exception e) { + fail("Unexpected exception: " + e); + } + + fail(SchemaOperationException.class.getSimpleName() + " is not thrown."); + } + + /** + * Ensure index is used in plan. + * + * @param idxName Index name. + * @param sql SQL. + * @param args Arguments. + */ + protected static void assertIndexUsed(String idxName, String sql, Object... args) { + for (Ignite node : Ignition.allGrids()) + assertIndexUsed((IgniteEx)node, idxName, sql, args); + } + + /** + * Ensure index is used in plan. + * + * @param node Node. + * @param idxName Index name. + * @param sql SQL. + * @param args Arguments. + */ + protected static void assertIndexUsed(IgniteEx node, String idxName, String sql, Object... args) { + SqlFieldsQuery qry = new SqlFieldsQuery("EXPLAIN " + sql); + + if (args != null && args.length > 0) + qry.setArgs(args); + + String plan = (String)node.cache(CACHE_NAME).query(qry).getAll().get(0).get(0); + + assertTrue("Index is not used: " + plan, plan.toLowerCase().contains(idxName.toLowerCase())); + } + + /** + * Ensure index is not used in plan. + * + * @param idxName Index name. + * @param sql SQL. + * @param args Arguments. + */ + protected static void assertIndexNotUsed(String idxName, String sql, Object... args) { + for (Ignite node : Ignition.allGrids()) + assertIndexNotUsed((IgniteEx)node, idxName, sql, args); + } + + /** + * Ensure index is not used in plan. + * + * @param node Node. + * @param idxName Index name. + * @param sql SQL. + * @param args Arguments. + */ + protected static void assertIndexNotUsed(IgniteEx node, String idxName, String sql, Object... args) { + SqlFieldsQuery qry = new SqlFieldsQuery("EXPLAIN " + sql); + + if (args != null && args.length > 0) + qry.setArgs(args); + + String plan = (String)node.cache(CACHE_NAME).query(qry).getAll().get(0).get(0); + + assertFalse("Index is used: " + plan, plan.contains(idxName)); + } + + /** + * Create key object. + * + * @param ignite Ignite instance. + * @param id ID. + * @return Key object. + */ + protected static BinaryObject key(Ignite ignite, long id) { + return ignite.binary().builder(KeyClass.class.getName()).setField(FIELD_KEY, id).build(); + } + + /** + * Create value object. + * + * @param ignite Ignite instance. + * @param id ID. + * @return Value object. + */ + protected static BinaryObject value(Ignite ignite, long id) { + return ignite.binary().builder(ValueClass.class.getName()) + .setField(FIELD_NAME_1, id) + .setField(FIELD_NAME_2, id) + .build(); + } + + /** + * Create key/value entry for the given key. + * + * @param ignite Ignite instance. + * @param id ID. + * @return Entry. + */ + protected static T2<BinaryObject, BinaryObject> entry(Ignite ignite, long id) { + return new T2<>(key(ignite, id), value(ignite, id)); + } + + /** + * Get common cache. + * + * @param node Node. + * @return Cache. + */ + protected static IgniteCache<BinaryObject, BinaryObject> cache(Ignite node) { + return node.cache(CACHE_NAME).withKeepBinary(); + } + + /** + * Get key. + * + * @param node Node. + * @param id ID. + */ + protected static BinaryObject get(Ignite node, int id) { + BinaryObject key = key(node, id); + + return cache(node).get(key); + } + + /** + * Put key range. + * + * @param node Node. + * @param from From key. + * @param to To key. + */ + protected static void put(Ignite node, int from, int to) { + try (IgniteDataStreamer streamer = node.dataStreamer(CACHE_NAME)) { + streamer.allowOverwrite(true); + streamer.keepBinary(true); + + for (int i = from; i < to; i++) { + BinaryObject key = key(node, i); + BinaryObject val = value(node, i); + + streamer.addData(key, val); + } + + streamer.flush(); + } + } + + /** + * Put key to cache. + * + * @param node Node. + * @param id ID. + */ + protected static void put(Ignite node, long id) { + BinaryObject key = key(node, id); + BinaryObject val = value(node, id); + + cache(node).put(key, val); + } + + /** + * Remove key range. + * + * @param node Node. + * @param from From key. + * @param to To key. + */ + protected static void remove(Ignite node, int from, int to) { + for (int i = from; i < to; i++) + remove(node, i); + } + + /** + * Remove key form cache. + * + * @param node Node. + * @param id ID. + */ + protected static void remove(Ignite node, long id) { + BinaryObject key = key(node, id); + + cache(node).remove(key); + } + + /** + * @return Random string. + */ + protected static String randomString() { + return UUID.randomUUID().toString(); + } + + /** + * Assert SQL simple data state. + * + * @param sql SQL query. + * @param expSize Expected size. + */ + protected static void assertSqlSimpleData(String sql, int expSize) { + for (Ignite node : Ignition.allGrids()) + assertSqlSimpleData(node, sql, expSize); + } + + /** + * Assert SQL simple data state. + * + * @param node Node. + * @param sql SQL query. + * @param expSize Expected size. + */ + protected static void assertSqlSimpleData(Ignite node, String sql, int expSize) { + SqlQuery qry = new SqlQuery(tableName(ValueClass.class), sql).setArgs(SQL_ARG_1); + + List<Cache.Entry<BinaryObject, BinaryObject>> res = node.cache(CACHE_NAME).withKeepBinary().query(qry).getAll(); + + Set<Long> ids = new HashSet<>(); + + for (Cache.Entry<BinaryObject, BinaryObject> entry : res) { + long id = entry.getKey().field(FIELD_KEY); + + long field1 = entry.getValue().field(FIELD_NAME_1); + long field2 = entry.getValue().field(FIELD_NAME_2); + + assertTrue(field1 >= SQL_ARG_1); + + assertEquals(id, field1); + assertEquals(id, field2); + + assertTrue(ids.add(id)); + } + + assertEquals("Size mismatch [node=" + node.name() + ", exp=" + expSize + ", actual=" + res.size() + + ", ids=" + ids + ']', expSize, res.size()); + } + + /** + * Assert SQL simple data state. + * + * @param node Node. + * @param sql SQL query. + * @param expSize Expected size. + */ + protected static void assertSqlCompositeData(Ignite node, String sql, int expSize) { + SqlQuery qry = new SqlQuery(tableName(ValueClass.class), sql).setArgs(SQL_ARG_1, SQL_ARG_2); + + List<Cache.Entry<BinaryObject, BinaryObject>> res = node.cache(CACHE_NAME).withKeepBinary().query(qry).getAll(); + + Set<Long> ids = new HashSet<>(); + + for (Cache.Entry<BinaryObject, BinaryObject> entry : res) { + long id = entry.getKey().field(FIELD_KEY); + + long field1 = entry.getValue().field(FIELD_NAME_1); + long field2 = entry.getValue().field(FIELD_NAME_2); + + assertTrue(field1 >= SQL_ARG_2); + + assertEquals(id, field1); + assertEquals(id, field2); + + assertTrue(ids.add(id)); + } + + assertEquals("Size mismatch [exp=" + expSize + ", actual=" + res.size() + ", ids=" + ids + ']', + expSize, res.size()); + } + + /** + * Node filter. + */ + protected static class NodeFilter implements IgnitePredicate<ClusterNode>, Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public boolean apply(ClusterNode node) { + return node.attribute(ATTR_FILTERED) == null; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexClientBasicSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexClientBasicSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexClientBasicSelfTest.java new file mode 100644 index 0000000..10f4f85 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexClientBasicSelfTest.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.index; + +/** + * Test dynamic schema operations from client node. + */ +public class DynamicIndexClientBasicSelfTest extends DynamicIndexAbstractBasicSelfTest { + /** {@inheritDoc} */ + @Override protected int nodeIndex() { + return IDX_CLI; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexPartitionedAtomicConcurrentSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexPartitionedAtomicConcurrentSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexPartitionedAtomicConcurrentSelfTest.java new file mode 100644 index 0000000..497ec39 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexPartitionedAtomicConcurrentSelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.index; + +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; + +/** + * Concurrency tests for dynamic index create/drop for PARTITIONED/ATOMIC cache. + */ +public class DynamicIndexPartitionedAtomicConcurrentSelfTest extends DynamicIndexAbstractConcurrentSelfTest { + /** + * Constructor. + */ + public DynamicIndexPartitionedAtomicConcurrentSelfTest() { + super(CacheMode.PARTITIONED, CacheAtomicityMode.ATOMIC); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexPartitionedTransactionalConcurrentSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexPartitionedTransactionalConcurrentSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexPartitionedTransactionalConcurrentSelfTest.java new file mode 100644 index 0000000..fed0149 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexPartitionedTransactionalConcurrentSelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.index; + +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; + +/** + * Concurrency tests for dynamic index create/drop for PARTITIONED/TRANSACTIONAL cache. + */ +public class DynamicIndexPartitionedTransactionalConcurrentSelfTest extends DynamicIndexAbstractConcurrentSelfTest { + /** + * Constructor. + */ + public DynamicIndexPartitionedTransactionalConcurrentSelfTest() { + super(CacheMode.PARTITIONED, CacheAtomicityMode.TRANSACTIONAL); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexReplicatedAtomicConcurrentSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexReplicatedAtomicConcurrentSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexReplicatedAtomicConcurrentSelfTest.java new file mode 100644 index 0000000..2c6c9a9 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexReplicatedAtomicConcurrentSelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.index; + +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; + +/** + * Concurrency tests for dynamic index create/drop for REPLICATED/ATOMIC cache. + */ +public class DynamicIndexReplicatedAtomicConcurrentSelfTest extends DynamicIndexAbstractConcurrentSelfTest { + /** + * Constructor. + */ + public DynamicIndexReplicatedAtomicConcurrentSelfTest() { + super(CacheMode.REPLICATED, CacheAtomicityMode.ATOMIC); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexReplicatedTransactionalConcurrentSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexReplicatedTransactionalConcurrentSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexReplicatedTransactionalConcurrentSelfTest.java new file mode 100644 index 0000000..9dc92a4 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexReplicatedTransactionalConcurrentSelfTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.index; + +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; + +/** + * Concurrency tests for dynamic index create/drop for REPLICATED/TRANSACTIONAL cache. + */ +public class DynamicIndexReplicatedTransactionalConcurrentSelfTest extends DynamicIndexAbstractConcurrentSelfTest { + /** + * Constructor. + */ + public DynamicIndexReplicatedTransactionalConcurrentSelfTest() { + super(CacheMode.REPLICATED, CacheAtomicityMode.TRANSACTIONAL); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexServerBasicSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexServerBasicSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexServerBasicSelfTest.java new file mode 100644 index 0000000..c014229 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexServerBasicSelfTest.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.index; + +/** + * Test dynamic schema operations from non-coordinator node. + */ +public class DynamicIndexServerBasicSelfTest extends DynamicIndexAbstractBasicSelfTest { + /** {@inheritDoc} */ + @Override protected int nodeIndex() { + return IDX_SRV_NON_CRD; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexServerCoordinatorBasicSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexServerCoordinatorBasicSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexServerCoordinatorBasicSelfTest.java new file mode 100644 index 0000000..7427a4c --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexServerCoordinatorBasicSelfTest.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.index; + +/** + * Test dynamic schema operations from coordinator node. + */ +public class DynamicIndexServerCoordinatorBasicSelfTest extends DynamicIndexAbstractBasicSelfTest { + /** {@inheritDoc} */ + @Override protected int nodeIndex() { + return IDX_SRV_CRD; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexServerNodeFIlterBasicSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexServerNodeFIlterBasicSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexServerNodeFIlterBasicSelfTest.java new file mode 100644 index 0000000..b8acd1d --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexServerNodeFIlterBasicSelfTest.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.index; + +/** + * Test dynamic schema operations from server node which do not pass node filter. + */ +public class DynamicIndexServerNodeFIlterBasicSelfTest extends DynamicIndexAbstractBasicSelfTest { + /** {@inheritDoc} */ + @Override protected int nodeIndex() { + return IDX_SRV_FILTERED; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexServerNodeFilterCoordinatorBasicSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexServerNodeFilterCoordinatorBasicSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexServerNodeFilterCoordinatorBasicSelfTest.java new file mode 100644 index 0000000..e297fe1 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/DynamicIndexServerNodeFilterCoordinatorBasicSelfTest.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.index; + +import org.apache.ignite.configuration.IgniteConfiguration; + +/** + * Test dynamic schema operations from server node which do not pass node filter and which is coordinator. + */ +public class DynamicIndexServerNodeFilterCoordinatorBasicSelfTest extends DynamicIndexServerCoordinatorBasicSelfTest { + /** {@inheritDoc} */ + @Override protected IgniteConfiguration serverCoordinatorConfiguration(int idx) throws Exception { + return serverConfiguration(idx, true); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexAbstractSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexAbstractSelfTest.java new file mode 100644 index 0000000..cf563cc --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexAbstractSelfTest.java @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.index; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import javax.cache.CacheException; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.Ignition; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.binary.BinaryMarshaller; +import org.apache.ignite.internal.processors.cache.query.IgniteQueryErrorCode; +import org.apache.ignite.internal.processors.query.IgniteSQLException; +import org.apache.ignite.internal.util.typedef.F; + +/** + * Test that checks indexes handling on H2 side. + */ +public abstract class H2DynamicIndexAbstractSelfTest extends AbstractSchemaSelfTest { + /** Client node index. */ + private final static int CLIENT = 2; + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + for (IgniteConfiguration cfg : configurations()) + Ignition.start(cfg); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + client().getOrCreateCache(cacheConfiguration()); + + assertNoIndex(CACHE_NAME, TBL_NAME, IDX_NAME_1); + + IgniteCache<KeyClass, ValueClass> cache = client().cache(CACHE_NAME); + + cache.put(new KeyClass(1), new ValueClass("val1")); + cache.put(new KeyClass(2), new ValueClass("val2")); + cache.put(new KeyClass(3), new ValueClass("val3")); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + client().destroyCache(CACHE_NAME); + + super.afterTest(); + } + + /** + * Test that after index creation index is used by queries. + */ + public void testCreateIndex() throws Exception { + IgniteCache<KeyClass, ValueClass> cache = cache(); + + assertSize(3); + + cache.query(new SqlFieldsQuery("CREATE INDEX \"" + IDX_NAME_1 + "\" ON \"" + TBL_NAME + "\"(\"" + + FIELD_NAME_1 + "\" ASC)")).getAll(); + + // Test that local queries on all nodes use new index. + for (int i = 0 ; i < 4; i++) { + List<List<?>> locRes = ignite(i).cache("cache").query(new SqlFieldsQuery("explain select \"id\" from " + + "\"cache\".\"ValueClass\" where \"field1\" = 'A'").setLocal(true)).getAll(); + + assertEquals(F.asList( + Collections.singletonList("SELECT\n" + + " \"id\"\n" + + "FROM \"cache\".\"ValueClass\"\n" + + " /* \"cache\".\"idx_1\": \"field1\" = 'A' */\n" + + "WHERE \"field1\" = 'A'") + ), locRes); + } + + assertSize(3); + + cache.remove(new KeyClass(2)); + + assertSize(2); + + cache.put(new KeyClass(4), new ValueClass("someVal")); + + assertSize(3); + } + + /** + * Test that creating an index with duplicate name yields an error. + */ + public void testCreateIndexWithDuplicateName() { + final IgniteCache<KeyClass, ValueClass> cache = cache(); + + cache.query(new SqlFieldsQuery("CREATE INDEX \"" + IDX_NAME_1 + "\" ON \"" + TBL_NAME + "\"(\"" + + FIELD_NAME_1 + "\" ASC)")); + + assertSqlException(new RunnableX() { + @Override public void run() throws Exception { + cache.query(new SqlFieldsQuery("CREATE INDEX \"" + IDX_NAME_1 + "\" ON \"" + TBL_NAME + "\"(\"id\" ASC)")); + } + }, IgniteQueryErrorCode.INDEX_ALREADY_EXISTS); + } + + /** + * Test that creating an index with duplicate name does not yield an error with {@code IF NOT EXISTS}. + */ + public void testCreateIndexIfNotExists() { + final IgniteCache<KeyClass, ValueClass> cache = cache(); + + cache.query(new SqlFieldsQuery("CREATE INDEX \"" + IDX_NAME_1 + "\" ON \"" + TBL_NAME + "\"(\"" + + FIELD_NAME_1 + "\" ASC)")); + + cache.query(new SqlFieldsQuery("CREATE INDEX IF NOT EXISTS \"" + IDX_NAME_1 + "\" ON \"" + TBL_NAME + + "\"(\"id\" ASC)")); + } + + /** + * Test that after index drop there are no attempts to use it, and data state remains intact. + */ + public void testDropIndex() { + IgniteCache<KeyClass, ValueClass> cache = cache(); + + assertSize(3); + + cache.query(new SqlFieldsQuery("CREATE INDEX \"" + IDX_NAME_1 + "\" ON \"" + TBL_NAME + "\"(\"" + + FIELD_NAME_1 + "\" ASC)")); + + assertSize(3); + + cache.query(new SqlFieldsQuery("DROP INDEX \"" + IDX_NAME_1 + "\"")); + + // Test that no local queries on all nodes use new index. + for (int i = 0 ; i < 4; i++) { + List<List<?>> locRes = ignite(i).cache("cache").query(new SqlFieldsQuery("explain select \"id\" from " + + "\"cache\".\"ValueClass\" where \"field1\" = 'A'").setLocal(true)).getAll(); + + assertEquals(F.asList( + Collections.singletonList("SELECT\n" + + " \"id\"\n" + + "FROM \"cache\".\"ValueClass\"\n" + + " /* \"cache\".\"ValueClass\".__SCAN_ */\n" + + "WHERE \"field1\" = 'A'") + ), locRes); + } + + assertSize(3); + } + + /** + * Test that dropping a non-existent index yields an error. + */ + public void testDropMissingIndex() { + final IgniteCache<KeyClass, ValueClass> cache = cache(); + + assertSqlException(new RunnableX() { + @Override public void run() throws Exception { + cache.query(new SqlFieldsQuery("DROP INDEX \"" + IDX_NAME_1 + "\"")); + } + }, IgniteQueryErrorCode.INDEX_NOT_FOUND); + } + + /** + * Test that dropping a non-existent index does not yield an error with {@code IF EXISTS}. + */ + public void testDropMissingIndexIfExists() { + final IgniteCache<KeyClass, ValueClass> cache = cache(); + + cache.query(new SqlFieldsQuery("DROP INDEX IF EXISTS \"" + IDX_NAME_1 + "\"")); + } + + /** + * Test that changes in cache affect index, and vice versa. + */ + public void testIndexState() { + IgniteCache<KeyClass, ValueClass> cache = cache(); + + assertColumnValues("val1", "val2", "val3"); + + cache.query(new SqlFieldsQuery("CREATE INDEX \"" + IDX_NAME_1 + "\" ON \"" + TBL_NAME + "\"(\"" + + FIELD_NAME_1 + "\" ASC)")); + + assertColumnValues("val1", "val2", "val3"); + + cache.remove(new KeyClass(2)); + + assertColumnValues("val1", "val3"); + + cache.put(new KeyClass(0), new ValueClass("someVal")); + + assertColumnValues("someVal", "val1", "val3"); + + cache.query(new SqlFieldsQuery("DROP INDEX \"" + IDX_NAME_1 + "\"")); + + assertColumnValues("someVal", "val1", "val3"); + } + + /** + * Check that values of {@code field1} match what we expect. + * @param vals Expected values. + */ + private void assertColumnValues(String... vals) { + List<List<?>> expRes = new ArrayList<>(vals.length); + + for (String v : vals) + expRes.add(Collections.singletonList(v)); + + assertEquals(expRes, cache().query(new SqlFieldsQuery("SELECT \"" + FIELD_NAME_1 + "\" FROM \"" + TBL_NAME + + "\" ORDER BY \"id\"")) + .getAll()); + } + + /** + * Do a {@code SELECT COUNT(*)} query to check index state correctness. + * @param expSize Expected number of items in table. + */ + private void assertSize(long expSize) { + assertEquals(expSize, cache().size()); + + assertEquals(expSize, cache().query(new SqlFieldsQuery("SELECT COUNT(*) from \"ValueClass\"")) + .getAll().get(0).get(0)); + } + + /** + * Get configurations to be used in test. + * + * @return Configurations. + * @throws Exception If failed. + */ + private List<IgniteConfiguration> configurations() throws Exception { + return Arrays.asList( + serverConfiguration(0), + serverConfiguration(1), + clientConfiguration(2), + serverConfiguration(3) + ); + } + + /** + * @return Client node. + */ + private Ignite client() { + return ignite(CLIENT); + } + + /** + * @return Cache. + */ + private IgniteCache<KeyClass, ValueClass> cache() { + return client().cache(CACHE_NAME); + } + + /** + * Create server configuration. + * + * @param idx Index. + * @return Configuration. + * @throws Exception If failed. + */ + private IgniteConfiguration serverConfiguration(int idx) throws Exception { + return commonConfiguration(idx); + } + + /** + * Create client configuration. + * + * @param idx Index. + * @return Configuration. + * @throws Exception If failed. + */ + private IgniteConfiguration clientConfiguration(int idx) throws Exception { + return commonConfiguration(idx).setClientMode(true); + } + + /** + * Create common node configuration. + * + * @param idx Index. + * @return Configuration. + * @throws Exception If failed. + */ + private IgniteConfiguration commonConfiguration(int idx) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(getTestIgniteInstanceName(idx)); + + cfg.setMarshaller(new BinaryMarshaller()); + + return optimize(cfg); + } + + /** + * @return Default cache configuration. + */ + private CacheConfiguration cacheConfiguration() { + CacheConfiguration<KeyClass, ValueClass> ccfg = new CacheConfiguration<KeyClass, ValueClass>() + .setName(CACHE_NAME); + + QueryEntity entity = new QueryEntity(); + + entity.setKeyType(KeyClass.class.getName()); + entity.setValueType(ValueClass.class.getName()); + + entity.addQueryField("id", Long.class.getName(), null); + entity.addQueryField(FIELD_NAME_1, String.class.getName(), null); + entity.addQueryField(FIELD_NAME_2, String.class.getName(), null); + + entity.setKeyFields(Collections.singleton("id")); + + entity.setAliases(Collections.singletonMap(FIELD_NAME_2, alias(FIELD_NAME_2))); + + ccfg.setQueryEntities(Collections.singletonList(entity)); + + ccfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + ccfg.setSqlEscapeAll(true); + ccfg.setAtomicityMode(atomicityMode()); + ccfg.setCacheMode(cacheMode()); + + if (nearCache()) + ccfg.setNearConfiguration(new NearCacheConfiguration<KeyClass, ValueClass>()); + + return ccfg; + } + + /** + * @return Cache mode to use. + */ + protected abstract CacheMode cacheMode(); + + /** + * @return Cache atomicity mode to use. + */ + protected abstract CacheAtomicityMode atomicityMode(); + + /** + * @return Whether to use near cache. + */ + protected abstract boolean nearCache(); + + /** + * Ensure that SQL exception is thrown. + * + * @param r Runnable. + * @param expCode Error code. + */ + private static void assertSqlException(DynamicIndexAbstractBasicSelfTest.RunnableX r, int expCode) { + try { + try { + r.run(); + } + catch (CacheException e) { + if (e.getCause() != null) + throw (Exception)e.getCause(); + else + throw e; + } + } + catch (IgniteSQLException e) { + assertEquals("Unexpected error code [expected=" + expCode + ", actual=" + e.statusCode() + ']', + expCode, e.statusCode()); + + return; + } + catch (Exception e) { + fail("Unexpected exception: " + e); + } + + fail(IgniteSQLException.class.getSimpleName() + " is not thrown."); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexAtomicPartitionedNearSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexAtomicPartitionedNearSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexAtomicPartitionedNearSelfTest.java new file mode 100644 index 0000000..96a7c14 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexAtomicPartitionedNearSelfTest.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.index; + +/** */ +public class H2DynamicIndexAtomicPartitionedNearSelfTest extends H2DynamicIndexAtomicPartitionedSelfTest { + /** {@inheritDoc} */ + @Override protected boolean nearCache() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexAtomicPartitionedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexAtomicPartitionedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexAtomicPartitionedSelfTest.java new file mode 100644 index 0000000..0a4c48c --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexAtomicPartitionedSelfTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.index; + +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; + +/** */ +public class H2DynamicIndexAtomicPartitionedSelfTest extends H2DynamicIndexAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return CacheMode.PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected boolean nearCache() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexAtomicReplicatedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexAtomicReplicatedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexAtomicReplicatedSelfTest.java new file mode 100644 index 0000000..fc9f9e7 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexAtomicReplicatedSelfTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.index; + +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; + +/** */ +public class H2DynamicIndexAtomicReplicatedSelfTest extends H2DynamicIndexAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return CacheMode.REPLICATED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.ATOMIC; + } + + /** {@inheritDoc} */ + @Override protected boolean nearCache() { + return false; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexTransactionalPartitionedNearSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexTransactionalPartitionedNearSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexTransactionalPartitionedNearSelfTest.java new file mode 100644 index 0000000..e8c4fb2 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexTransactionalPartitionedNearSelfTest.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.index; + +/** */ +public class H2DynamicIndexTransactionalPartitionedNearSelfTest extends H2DynamicIndexTransactionalPartitionedSelfTest { + /** {@inheritDoc} */ + @Override protected boolean nearCache() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/2edb935c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexTransactionalPartitionedSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexTransactionalPartitionedSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexTransactionalPartitionedSelfTest.java new file mode 100644 index 0000000..ad61412 --- /dev/null +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/H2DynamicIndexTransactionalPartitionedSelfTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.index; + +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; + +/** */ +public class H2DynamicIndexTransactionalPartitionedSelfTest extends H2DynamicIndexAbstractSelfTest { + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return CacheMode.PARTITIONED; + } + + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return CacheAtomicityMode.TRANSACTIONAL; + } + + /** {@inheritDoc} */ + @Override protected boolean nearCache() { + return false; + } +}