http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java deleted file mode 100644 index 2b81162..0000000 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFactoryFilterTest.java +++ /dev/null @@ -1,714 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache.query.continuous; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.UUID; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ThreadLocalRandom; -import javax.cache.configuration.CacheEntryListenerConfiguration; -import javax.cache.configuration.Factory; -import javax.cache.configuration.FactoryBuilder; -import javax.cache.configuration.MutableCacheEntryListenerConfiguration; -import javax.cache.event.CacheEntryCreatedListener; -import javax.cache.event.CacheEntryEvent; -import javax.cache.event.CacheEntryExpiredListener; -import javax.cache.event.CacheEntryListenerException; -import javax.cache.event.CacheEntryRemovedListener; -import javax.cache.event.CacheEntryUpdatedListener; -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCache; -import org.apache.ignite.cache.CacheEntryEventSerializableFilter; -import org.apache.ignite.cache.affinity.Affinity; -import org.apache.ignite.cache.query.CacheQueryEntryEvent; -import org.apache.ignite.cache.query.ContinuousQuery; -import org.apache.ignite.cache.query.QueryCursor; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.internal.util.typedef.T2; -import org.apache.ignite.transactions.Transaction; -import org.apache.ignite.transactions.TransactionConcurrency; -import org.apache.ignite.transactions.TransactionIsolation; - -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; -import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; -import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED; -import static org.apache.ignite.cache.CacheMode.REPLICATED; -import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFactoryFilterTest.NonSerializableFilter.isAccepted; -import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.CLIENT; -import static org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryRandomOperationsTest.ContinuousDeploy.SERVER; -import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; -import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; -import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; - -/** - * - */ -public class CacheContinuousQueryFactoryFilterTest extends CacheContinuousQueryRandomOperationsTest { - /** */ - private static final int NODES = 5; - - /** */ - private static final int KEYS = 50; - - /** */ - private static final int VALS = 10; - - /** */ - public static final int ITERATION_CNT = 40; - - /** - * @throws Exception If failed. - */ - public void testInternalQuery() throws Exception { - CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, - 1, - ATOMIC, - ONHEAP_TIERED, - false); - - final IgniteCache<Object, Object> cache = grid(0).getOrCreateCache(ccfg); - - UUID uuid = null; - - try { - for (int i = 0; i < 10; i++) - cache.put(i, i); - - final CountDownLatch latch = new CountDownLatch(5); - - CacheEntryUpdatedListener lsnr = new CacheEntryUpdatedListener() { - @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException { - for (Object evt : iterable) { - latch.countDown(); - - log.info("Received event: " + evt); - } - } - }; - - uuid = grid(0).context().cache().cache(cache.getName()).context().continuousQueries() - .executeInternalQuery(lsnr, new SerializableFilter(), false, true, true); - - for (int i = 10; i < 20; i++) - cache.put(i, i); - - assertTrue(latch.await(3, SECONDS)); - } - finally { - if (uuid != null) - grid(0).context().cache().cache(cache.getName()).context().continuousQueries() - .cancelInternalQuery(uuid); - - cache.destroy(); - } - } - - /** {@inheritDoc} */ - @Override protected void doTestContinuousQuery(CacheConfiguration<Object, Object> ccfg, ContinuousDeploy deploy) - throws Exception { - ignite(0).createCache(ccfg); - - try { - long seed = System.currentTimeMillis(); - - Random rnd = new Random(seed); - - log.info("Random seed: " + seed); - - List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues = new ArrayList<>(); - - Collection<QueryCursor<?>> curs = new ArrayList<>(); - - Collection<T2<Integer, MutableCacheEntryListenerConfiguration>> lsnrCfgs = new ArrayList<>(); - - if (deploy == CLIENT) - evtsQueues.add(registerListener(ccfg.getName(), NODES - 1, curs, lsnrCfgs, rnd.nextBoolean())); - else if (deploy == SERVER) - evtsQueues.add(registerListener(ccfg.getName(), rnd.nextInt(NODES - 1), curs, lsnrCfgs, - rnd.nextBoolean())); - else { - boolean isSync = rnd.nextBoolean(); - - for (int i = 0; i < NODES - 1; i++) - evtsQueues.add(registerListener(ccfg.getName(), i, curs, lsnrCfgs, isSync)); - } - - ConcurrentMap<Object, Object> expData = new ConcurrentHashMap<>(); - - Map<Integer, Long> partCntr = new ConcurrentHashMap<>(); - - try { - for (int i = 0; i < ITERATION_CNT; i++) { - if (i % 10 == 0) - log.info("Iteration: " + i); - - for (int idx = 0; idx < NODES; idx++) - randomUpdate(rnd, evtsQueues, expData, partCntr, grid(idx).cache(ccfg.getName())); - } - } - finally { - for (QueryCursor<?> cur : curs) - cur.close(); - - for (T2<Integer, MutableCacheEntryListenerConfiguration> e : lsnrCfgs) - grid(e.get1()).cache(ccfg.getName()).deregisterCacheEntryListener(e.get2()); - } - } - finally { - ignite(0).destroyCache(ccfg.getName()); - } - } - - /** - * @param cacheName Cache name. - * @param nodeIdx Node index. - * @param curs Cursors. - * @param lsnrCfgs Listener configurations. - * @return Event queue - */ - private BlockingQueue<CacheEntryEvent<?, ?>> registerListener(String cacheName, - int nodeIdx, - Collection<QueryCursor<?>> curs, - Collection<T2<Integer, MutableCacheEntryListenerConfiguration>> lsnrCfgs, - boolean sync) { - final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = new ArrayBlockingQueue<>(50_000); - - if (ThreadLocalRandom.current().nextBoolean()) { - MutableCacheEntryListenerConfiguration<QueryTestKey, QueryTestValue> lsnrCfg = - new MutableCacheEntryListenerConfiguration<>( - FactoryBuilder.factoryOf(new LocalNonSerialiseListener() { - @Override protected void onEvents(Iterable<CacheEntryEvent<? extends QueryTestKey, - ? extends QueryTestValue>> evts) { - for (CacheEntryEvent<?, ?> evt : evts) - evtsQueue.add(evt); - } - }), - new FilterFactory(), - true, - sync - ); - - grid(nodeIdx).cache(cacheName).registerCacheEntryListener((CacheEntryListenerConfiguration)lsnrCfg); - - lsnrCfgs.add(new T2<Integer, MutableCacheEntryListenerConfiguration>(nodeIdx, lsnrCfg)); - } - else { - ContinuousQuery<QueryTestKey, QueryTestValue> qry = new ContinuousQuery<>(); - - qry.setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>() { - @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey, - ? extends QueryTestValue>> evts) throws CacheEntryListenerException { - for (CacheEntryEvent<?, ?> evt : evts) - evtsQueue.add(evt); - } - }); - - qry.setRemoteFilterFactory(new FilterFactory()); - - QueryCursor<?> cur = grid(nodeIdx).cache(cacheName).query(qry); - - curs.add(cur); - } - - return evtsQueue; - } - - /** - * @param rnd Random generator. - * @param evtsQueues Events queue. - * @param expData Expected cache data. - * @param partCntr Partition counter. - * @param cache Cache. - * @throws Exception If failed. - */ - private void randomUpdate( - Random rnd, - List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues, - ConcurrentMap<Object, Object> expData, - Map<Integer, Long> partCntr, - IgniteCache<Object, Object> cache) - throws Exception { - Object key = new QueryTestKey(rnd.nextInt(KEYS)); - Object newVal = value(rnd); - Object oldVal = expData.get(key); - - int op = rnd.nextInt(11); - - Ignite ignite = cache.unwrap(Ignite.class); - - Transaction tx = null; - - if (cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == TRANSACTIONAL && rnd.nextBoolean()) - tx = ignite.transactions().txStart(txRandomConcurrency(rnd), txRandomIsolation(rnd)); - - try { - // log.info("Random operation [key=" + key + ", op=" + op + ']'); - - switch (op) { - case 0: { - cache.put(key, newVal); - - if (tx != null) - tx.commit(); - - updatePartitionCounter(cache, key, partCntr); - - waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal); - - expData.put(key, newVal); - - break; - } - - case 1: { - cache.getAndPut(key, newVal); - - if (tx != null) - tx.commit(); - - updatePartitionCounter(cache, key, partCntr); - - waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal); - - expData.put(key, newVal); - - break; - } - - case 2: { - cache.remove(key); - - if (tx != null) - tx.commit(); - - updatePartitionCounter(cache, key, partCntr); - - waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal); - - expData.remove(key); - - break; - } - - case 3: { - cache.getAndRemove(key); - - if (tx != null) - tx.commit(); - - updatePartitionCounter(cache, key, partCntr); - - waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal); - - expData.remove(key); - - break; - } - - case 4: { - cache.invoke(key, new EntrySetValueProcessor(newVal, rnd.nextBoolean())); - - if (tx != null) - tx.commit(); - - updatePartitionCounter(cache, key, partCntr); - - waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal); - - expData.put(key, newVal); - - break; - } - - case 5: { - cache.invoke(key, new EntrySetValueProcessor(null, rnd.nextBoolean())); - - if (tx != null) - tx.commit(); - - updatePartitionCounter(cache, key, partCntr); - - waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, null, oldVal); - - expData.remove(key); - - break; - } - - case 6: { - cache.putIfAbsent(key, newVal); - - if (tx != null) - tx.commit(); - - if (oldVal == null) { - updatePartitionCounter(cache, key, partCntr); - - waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, null); - - expData.put(key, newVal); - } - else - checkNoEvent(evtsQueues); - - break; - } - - case 7: { - cache.getAndPutIfAbsent(key, newVal); - - if (tx != null) - tx.commit(); - - if (oldVal == null) { - updatePartitionCounter(cache, key, partCntr); - - waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, null); - - expData.put(key, newVal); - } - else - checkNoEvent(evtsQueues); - - break; - } - - case 8: { - cache.replace(key, newVal); - - if (tx != null) - tx.commit(); - - if (oldVal != null) { - updatePartitionCounter(cache, key, partCntr); - - waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal); - - expData.put(key, newVal); - } - else - checkNoEvent(evtsQueues); - - break; - } - - case 9: { - cache.getAndReplace(key, newVal); - - if (tx != null) - tx.commit(); - - if (oldVal != null) { - updatePartitionCounter(cache, key, partCntr); - - waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal); - - expData.put(key, newVal); - } - else - checkNoEvent(evtsQueues); - - break; - } - - case 10: { - if (oldVal != null) { - Object replaceVal = value(rnd); - - boolean success = replaceVal.equals(oldVal); - - if (success) { - cache.replace(key, replaceVal, newVal); - - if (tx != null) - tx.commit(); - - updatePartitionCounter(cache, key, partCntr); - - waitAndCheckEvent(evtsQueues, partCntr, affinity(cache), key, newVal, oldVal); - - expData.put(key, newVal); - } - else { - cache.replace(key, replaceVal, newVal); - - if (tx != null) - tx.commit(); - - checkNoEvent(evtsQueues); - } - } - else { - cache.replace(key, value(rnd), newVal); - - if (tx != null) - tx.commit(); - - checkNoEvent(evtsQueues); - } - - break; - } - - default: - fail("Op:" + op); - } - } finally { - if (tx != null) - tx.close(); - } - } - - /** - * @param rnd {@link Random}. - * @return {@link TransactionIsolation}. - */ - private TransactionIsolation txRandomIsolation(Random rnd) { - int val = rnd.nextInt(3); - - if (val == 0) - return READ_COMMITTED; - else if (val == 1) - return REPEATABLE_READ; - else - return SERIALIZABLE; - } - - /** - * @param rnd {@link Random}. - * @return {@link TransactionConcurrency}. - */ - private TransactionConcurrency txRandomConcurrency(Random rnd) { - return rnd.nextBoolean() ? TransactionConcurrency.OPTIMISTIC : TransactionConcurrency.PESSIMISTIC; - } - - /** - * @param cache Cache. - * @param key Key - * @param cntrs Partition counters. - */ - private void updatePartitionCounter(IgniteCache<Object, Object> cache, Object key, Map<Integer, Long> cntrs) { - Affinity<Object> aff = cache.unwrap(Ignite.class).affinity(cache.getName()); - - int part = aff.partition(key); - - Long partCntr = cntrs.get(part); - - if (partCntr == null) - partCntr = 0L; - - cntrs.put(part, ++partCntr); - } - - /** - * @param rnd Random generator. - * @return Cache value. - */ - private static Object value(Random rnd) { - return new QueryTestValue(rnd.nextInt(VALS)); - } - - /** - * @param evtsQueues Event queue. - * @param partCntrs Partition counters. - * @param aff Affinity function. - * @param key Key. - * @param val Value. - * @param oldVal Old value. - * @throws Exception If failed. - */ - private void waitAndCheckEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues, - Map<Integer, Long> partCntrs, - Affinity<Object> aff, - Object key, - Object val, - Object oldVal) - throws Exception { - if ((val == null && oldVal == null - || (val != null && !isAccepted((QueryTestValue)val)))) { - checkNoEvent(evtsQueues); - - return; - } - - for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) { - CacheEntryEvent<?, ?> evt = evtsQueue.poll(5, SECONDS); - - assertNotNull("Failed to wait for event [key=" + key + ", val=" + val + ", oldVal=" + oldVal + ']', evt); - assertEquals(key, evt.getKey()); - assertEquals(val, evt.getValue()); - assertEquals(oldVal, evt.getOldValue()); - - long cntr = partCntrs.get(aff.partition(key)); - CacheQueryEntryEvent qryEntryEvt = evt.unwrap(CacheQueryEntryEvent.class); - - assertNotNull(cntr); - assertNotNull(qryEntryEvt); - - assertEquals(cntr, qryEntryEvt.getPartitionUpdateCounter()); - } - } - - /** - * @param evtsQueues Event queue. - * @throws Exception If failed. - */ - private void checkNoEvent(List<BlockingQueue<CacheEntryEvent<?, ?>>> evtsQueues) throws Exception { - for (BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue : evtsQueues) { - CacheEntryEvent<?, ?> evt = evtsQueue.poll(50, MILLISECONDS); - - assertNull(evt); - } - } - - /** - * - */ - protected static class NonSerializableFilter - implements CacheEntryEventSerializableFilter<CacheContinuousQueryRandomOperationsTest.QueryTestKey, - CacheContinuousQueryRandomOperationsTest.QueryTestValue>, Externalizable { - /** */ - public NonSerializableFilter() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> event) - throws CacheEntryListenerException { - return isAccepted(event.getValue()); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - fail("Entry filter should not be marshaled."); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - fail("Entry filter should not be marshaled."); - } - - /** - * @return {@code True} if value is even. - */ - public static boolean isAccepted(QueryTestValue val) { - return val == null || val.val1 % 2 == 0; - } - } - - /** - * - */ - protected static class SerializableFilter implements CacheEntryEventSerializableFilter<Integer, Integer>{ - /** */ - public SerializableFilter() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> event) - throws CacheEntryListenerException { - return isAccepted(event.getValue()); - } - - /** - * @return {@code True} if value is even. - */ - public static boolean isAccepted(Integer val) { - return val == null || val % 2 == 0; - } - } - - /** - * - */ - protected static class FilterFactory implements Factory<NonSerializableFilter> { - @Override public NonSerializableFilter create() { - return new NonSerializableFilter(); - } - } - - /** - * - */ - public abstract class LocalNonSerialiseListener implements - CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>, - CacheEntryCreatedListener<QueryTestKey, QueryTestValue>, - CacheEntryExpiredListener<QueryTestKey, QueryTestValue>, - CacheEntryRemovedListener<QueryTestKey, QueryTestValue>, - Externalizable { - /** */ - public LocalNonSerialiseListener() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public void onCreated(Iterable<CacheEntryEvent<? extends QueryTestKey, - ? extends QueryTestValue>> evts) throws CacheEntryListenerException { - onEvents(evts); - } - - /** {@inheritDoc} */ - @Override public void onExpired(Iterable<CacheEntryEvent<? extends QueryTestKey, - ? extends QueryTestValue>> evts) throws CacheEntryListenerException { - onEvents(evts); - } - - /** {@inheritDoc} */ - @Override public void onRemoved(Iterable<CacheEntryEvent<? extends QueryTestKey, - ? extends QueryTestValue>> evts) throws CacheEntryListenerException { - onEvents(evts); - } - - /** {@inheritDoc} */ - @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey, - ? extends QueryTestValue>> evts) throws CacheEntryListenerException { - onEvents(evts); - } - - /** - * @param evts Events. - */ - protected abstract void onEvents(Iterable<CacheEntryEvent<? extends QueryTestKey, - ? extends QueryTestValue>> evts); - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - throw new UnsupportedOperationException("Failed. Listener should not be marshaled."); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - throw new UnsupportedOperationException("Failed. Listener should not be unmarshaled."); - } - } -}
http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java index 32add4f..d762561 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java @@ -81,6 +81,7 @@ import org.apache.ignite.internal.util.typedef.PA; import org.apache.ignite.internal.util.typedef.PAX; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.T3; +import org.apache.ignite.lang.IgniteAsyncCallback; import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.plugin.extensions.communication.Message; @@ -99,7 +100,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; -import static org.apache.ignite.cache.CacheMemoryMode.*; +import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED; import static org.apache.ignite.cache.CacheMode.REPLICATED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; @@ -167,6 +168,13 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC } /** + * @return Async callback flag. + */ + protected boolean asyncCallback() { + return false; + } + + /** * @return Near cache configuration. */ protected NearCacheConfiguration nearCacheConfiguration() { @@ -473,7 +481,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC for (int j = 0; j < 50; ++j) { ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); - final CacheEventListener3 lsnr = new CacheEventListener3(); + final CacheEventListener3 lsnr = asyncCallback() ? new CacheEventAsyncListener3() + : new CacheEventListener3(); qry.setLocalListener(lsnr); @@ -557,7 +566,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); - final CacheEventListener3 lsnr = new CacheEventListener3(); + final CacheEventListener3 lsnr = asyncCallback() ? new CacheEventAsyncListener3() : new CacheEventListener3(); qry.setLocalListener(lsnr); @@ -718,7 +727,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); - final CacheEventListener3 lsnr = new CacheEventListener3(); + final CacheEventListener3 lsnr = asyncCallback() ? new CacheEventAsyncListener3() : new CacheEventListener3(); qry.setLocalListener(lsnr); @@ -838,7 +847,8 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC Affinity<Object> aff = qryClient.affinity(null); - CacheEventListener1 lsnr = new CacheEventListener1(false); + CacheEventListener1 lsnr = asyncCallback() ? new CacheEventAsyncListener1(false) + : new CacheEventListener1(false); ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); @@ -1535,7 +1545,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC qry.setLocalListener(lsnr); - qry.setRemoteFilter(new CacheEventFilter()); + qry.setRemoteFilter(asyncCallback() ? new CacheEventAsyncFilter() : new CacheEventFilter()); QueryCursor<?> cur = qryClnCache.query(qry); @@ -1629,7 +1639,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC newQry.setLocalListener(dinLsnr); - newQry.setRemoteFilter(new CacheEventFilter()); + newQry.setRemoteFilter(asyncCallback() ? new CacheEventAsyncFilter() : new CacheEventFilter()); dinQry = qryClnCache.query(newQry); @@ -1776,7 +1786,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC final IgniteCache<Object, Object> qryClnCache = qryCln.cache(null); - final CacheEventListener2 lsnr = new CacheEventListener2(); + final CacheEventListener2 lsnr = asyncCallback() ? new CacheEventAsyncListener2() : new CacheEventListener2(); ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); @@ -2134,6 +2144,19 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC /** * */ + @IgniteAsyncCallback + private static class CacheEventAsyncListener1 extends CacheEventListener1 { + /** + * @param saveAll Save all events flag. + */ + CacheEventAsyncListener1(boolean saveAll) { + super(saveAll); + } + } + + /** + * + */ private static class CacheEventListener1 implements CacheEntryUpdatedListener<Object, Object> { /** */ private volatile CountDownLatch latch; @@ -2198,6 +2221,14 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC /** * */ + @IgniteAsyncCallback + private static class CacheEventAsyncListener2 extends CacheEventListener2 { + // No-op. + } + + /** + * + */ private static class CacheEventListener2 implements CacheEntryUpdatedListener<Object, Object> { /** */ @LoggerResource @@ -2265,6 +2296,14 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC /** * */ + @IgniteAsyncCallback + public static class CacheEventAsyncListener3 extends CacheEventListener3 { + // No-op. + } + + /** + * + */ public static class CacheEventListener3 implements CacheEntryUpdatedListener<Object, Object>, CacheEntryEventSerializableFilter<Object, Object> { /** Keys. */ @@ -2293,6 +2332,14 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC /** * */ + @IgniteAsyncCallback + private static class CacheEventAsyncFilter extends CacheEventFilter { + // No-op. + } + + /** + * + */ public static class CacheEventFilter implements CacheEntryEventSerializableFilter<Object, Object> { /** {@inheritDoc} */ @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) throws CacheEntryListenerException { http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java index 025dd80..b469a86 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryLostPartitionTest.java @@ -60,6 +60,12 @@ public class CacheContinuousQueryLostPartitionTest extends GridCommonAbstractTes super.beforeTest(); startGridsMultiThreaded(2); + + assert GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return grid(0).cluster().nodes().size() == 2; + } + }, 10000L); } /** {@inheritDoc} */ @@ -140,6 +146,14 @@ public class CacheContinuousQueryLostPartitionTest extends GridCommonAbstractTes // node2 now becomes the primary for the key. stopGrid(0); + final int prevSize = grid(1).cluster().nodes().size(); + + GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return prevSize - 1 == grid(1).cluster().nodes().size(); + } + }, 5000L); + cache2.put(key, "2"); // Sanity check. http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java new file mode 100644 index 0000000..0d027a9 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationFromCallbackTest.java @@ -0,0 +1,627 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.configuration.FactoryBuilder; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryUpdatedListener; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheEntryEventSerializableFilter; +import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.PA; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteAsyncCallback; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; + +/** + * + */ +public class CacheContinuousQueryOperationFromCallbackTest extends GridCommonAbstractTest { + /** */ + public static final int KEYS = 10; + + /** */ + public static final int KEYS_FROM_CALLBACK = 20; + + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int NODES = 5; + + /** */ + public static final int ITERATION_CNT = 20; + + /** */ + public static final int SYSTEM_POOL_SIZE = 10; + + /** */ + private boolean client; + + /** */ + private static AtomicInteger filterCbCntr = new AtomicInteger(0); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + cfg.setSystemThreadPoolSize(SYSTEM_POOL_SIZE); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); + + cfg.setClientMode(client); + + MemoryEventStorageSpi storeSpi = new MemoryEventStorageSpi(); + storeSpi.setExpireCount(100); + + cfg.setEventStorageSpi(storeSpi); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(NODES - 1); + + client = true; + + startGrid(NODES - 1); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + filterCbCntr.set(0); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicTwoBackups() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, ATOMIC, FULL_SYNC); + + doTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testTxTwoBackupsFilter() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, FULL_SYNC); + + doTest(ccfg, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxTwoBackupsFilterPrimary() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, PRIMARY_SYNC); + + doTest(ccfg, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxReplicatedFilter() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0, TRANSACTIONAL, FULL_SYNC); + + doTest(ccfg, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxTwoBackup() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, FULL_SYNC); + + doTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testTxReplicated() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, FULL_SYNC); + + doTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testTxReplicatedPrimary() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 2, TRANSACTIONAL, PRIMARY_SYNC); + + doTest(ccfg, true); + } + + /** + * @param ccfg Cache configuration. + * @throws Exception If failed. + */ + protected void doTest(final CacheConfiguration ccfg, boolean fromLsnr) throws Exception { + ignite(0).createCache(ccfg); + + List<QueryCursor<?>> qries = new ArrayList<>(); + + assertEquals(0, filterCbCntr.get()); + + try { + List<Set<T2<QueryTestKey, QueryTestValue>>> rcvdEvts = new ArrayList<>(NODES); + List<Set<T2<QueryTestKey, QueryTestValue>>> evtsFromCallbacks = new ArrayList<>(NODES); + + final AtomicInteger qryCntr = new AtomicInteger(0); + + final AtomicInteger cbCntr = new AtomicInteger(0); + + final int threadCnt = SYSTEM_POOL_SIZE * 2; + + for (int idx = 0; idx < NODES; idx++) { + Set<T2<QueryTestKey, QueryTestValue>> evts = Collections. + newSetFromMap(new ConcurrentHashMap<T2<QueryTestKey, QueryTestValue>, Boolean>()); + Set<T2<QueryTestKey, QueryTestValue>> evtsFromCb = Collections. + newSetFromMap(new ConcurrentHashMap<T2<QueryTestKey, QueryTestValue>, Boolean>()); + + IgniteCache<Object, Object> cache = grid(idx).getOrCreateCache(ccfg.getName()); + + ContinuousQuery qry = new ContinuousQuery(); + + qry.setLocalListener(new TestCacheAsyncEventListener(evts, evtsFromCb, + fromLsnr ? cache : null, qryCntr, cbCntr)); + + if (!fromLsnr) + qry.setRemoteFilterFactory( + FactoryBuilder.factoryOf(new CacheTestRemoteFilterAsync(ccfg.getName()))); + + rcvdEvts.add(evts); + evtsFromCallbacks.add(evtsFromCb); + + QueryCursor qryCursor = cache.query(qry); + + qries.add(qryCursor); + } + + IgniteInternalFuture<Long> f = GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + for (int i = 0; i < ITERATION_CNT; i++) { + IgniteCache<QueryTestKey, QueryTestValue> cache = + grid(rnd.nextInt(NODES)).cache(ccfg.getName()); + + QueryTestKey key = new QueryTestKey(rnd.nextInt(KEYS)); + + boolean startTx = cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == + TRANSACTIONAL && rnd.nextBoolean(); + + Transaction tx = null; + + if (startTx) + tx = cache.unwrap(Ignite.class).transactions().txStart(); + + try { + if ((cache.get(key) == null) || rnd.nextBoolean()) + cache.invoke(key, new IncrementTestEntryProcessor()); + else { + QueryTestValue val; + QueryTestValue newVal; + + do { + val = cache.get(key); + + newVal = val == null ? + new QueryTestValue(0) : new QueryTestValue(val.val1 + 1); + } + while (!cache.replace(key, val, newVal)); + } + } + finally { + if (tx != null) + tx.commit(); + } + } + } + }, threadCnt, "put-thread"); + + f.get(30, TimeUnit.SECONDS); + + assert GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return qryCntr.get() >= ITERATION_CNT * threadCnt * NODES; + } + }, TimeUnit.MINUTES.toMillis(2)); + + for (Set<T2<QueryTestKey, QueryTestValue>> set : rcvdEvts) + checkEvents(set, ITERATION_CNT * threadCnt, grid(0).cache(ccfg.getName()), false); + + if (fromLsnr) { + final int expCnt = qryCntr.get() * NODES * KEYS_FROM_CALLBACK; + + boolean res = GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return cbCntr.get() >= expCnt; + } + }, TimeUnit.SECONDS.toMillis(60)); + + assertTrue("Failed to wait events [exp=" + expCnt + ", act=" + cbCntr.get() + "]", res); + + assertEquals(expCnt, cbCntr.get()); + + for (Set<T2<QueryTestKey, QueryTestValue>> set : evtsFromCallbacks) + checkEvents(set, qryCntr.get() * KEYS_FROM_CALLBACK, grid(0).cache(ccfg.getName()), true); + } + else { + final int expInvkCnt = ITERATION_CNT * threadCnt * + (ccfg.getCacheMode() != REPLICATED ? (ccfg.getBackups() + 1) : NODES - 1) * NODES; + + GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return filterCbCntr.get() >= expInvkCnt; + } + }, TimeUnit.SECONDS.toMillis(60)); + + assertEquals(expInvkCnt, filterCbCntr.get()); + + for (Set<T2<QueryTestKey, QueryTestValue>> set : evtsFromCallbacks) + checkEvents(set, expInvkCnt * KEYS_FROM_CALLBACK, grid(0).cache(ccfg.getName()), true); + } + } + finally { + for (QueryCursor<?> qry : qries) + qry.close(); + + ignite(0).destroyCache(ccfg.getName()); + } + } + + /** + * @param expCnt Expected count. + * @param cache Cache. + * @param set Received events. + * @throws Exception If failed. + */ + private void checkEvents(final Set<T2<QueryTestKey, QueryTestValue>> set, final int expCnt, IgniteCache cache, + boolean cb) throws Exception { + assertTrue("Expected size: " + expCnt + ", actual: " + set.size(), GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return set.size() >= expCnt; + } + }, 10000L)); + + int startKey = cb ? KEYS : 0; + int endKey = cb ? KEYS + KEYS_FROM_CALLBACK : KEYS; + + for (int i = startKey; i < endKey; i++) { + QueryTestKey key = new QueryTestKey(i); + + QueryTestValue maxVal = (QueryTestValue)cache.get(key); + + for (int val = 0; val <= maxVal.val1; val++) + assertTrue(set.remove(new T2<>(key, new QueryTestValue(val)))); + } + + assertTrue(set.isEmpty()); + } + + /** + * + */ + private static class IncrementTestEntryProcessor implements + CacheEntryProcessor<QueryTestKey, QueryTestValue, Object> { + /** {@inheritDoc} */ + @Override public Object process(MutableEntry<QueryTestKey, QueryTestValue> entry, Object... arguments) + throws EntryProcessorException { + if (entry.exists()) + entry.setValue(new QueryTestValue(entry.getValue().val1 + 1)); + else + entry.setValue(new QueryTestValue(0)); + + return null; + } + } + + /** + * + */ + @IgniteAsyncCallback + private static class CacheTestRemoteFilterAsync implements + CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue> { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + private String cacheName; + + /** + * @param cacheName Cache name. + */ + public CacheTestRemoteFilterAsync(String cacheName) { + this.cacheName = cacheName; + } + + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e) + throws CacheEntryListenerException { + if (e.getKey().compareTo(new QueryTestKey(KEYS)) < 0) { + IgniteCache<QueryTestKey, QueryTestValue> cache = ignite.cache(cacheName); + + if (ThreadLocalRandom.current().nextBoolean()) { + Set<QueryTestKey> keys = new LinkedHashSet<>(); + + for (int key = KEYS; key < KEYS + KEYS_FROM_CALLBACK; key++) + keys.add(new QueryTestKey(key)); + + cache.invokeAll(keys, new IncrementTestEntryProcessor()); + } + else { + for (int key = KEYS; key < KEYS + KEYS_FROM_CALLBACK; key++) + cache.invoke(new QueryTestKey(key), new IncrementTestEntryProcessor()); + } + + filterCbCntr.incrementAndGet(); + } + + return true; + } + } + + /** + * + */ + @IgniteAsyncCallback + private static class TestCacheAsyncEventListener + implements CacheEntryUpdatedListener<QueryTestKey, QueryTestValue> { + /** */ + private final Set<T2<QueryTestKey, QueryTestValue>> rcvsEvts; + + /** */ + private final AtomicInteger cntr; + + /** */ + private final AtomicInteger cbCntr; + + /** */ + private final Set<T2<QueryTestKey, QueryTestValue>> evtsFromCb; + + /** */ + private IgniteCache<QueryTestKey, QueryTestValue> cache; + + /** + * @param rcvsEvts Set for received events. + * @param evtsFromCb Set for received events. + * @param cache Ignite cache. + * @param cntr Received events counter. + * @param cbCntr Received events counter from callbacks. + */ + public TestCacheAsyncEventListener(Set<T2<QueryTestKey, QueryTestValue>> rcvsEvts, + Set<T2<QueryTestKey, QueryTestValue>> evtsFromCb, + @Nullable IgniteCache cache, + AtomicInteger cntr, + AtomicInteger cbCntr) { + this.rcvsEvts = rcvsEvts; + this.evtsFromCb = evtsFromCb; + this.cache = cache; + this.cntr = cntr; + this.cbCntr = cbCntr; + } + + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> evts) + throws CacheEntryListenerException { + for (CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e : evts) { + if (e.getKey().compareTo(new QueryTestKey(KEYS)) < 0) { + rcvsEvts.add(new T2<>(e.getKey(), e.getValue())); + + cntr.incrementAndGet(); + + if (cache != null) { + if (ThreadLocalRandom.current().nextBoolean()) { + Set<QueryTestKey> keys = new LinkedHashSet<>(); + + for (int key = KEYS; key < KEYS + KEYS_FROM_CALLBACK; key++) + keys.add(new QueryTestKey(key)); + + cache.invokeAll(keys, new IncrementTestEntryProcessor()); + } + else { + for (int key = KEYS; key < KEYS + KEYS_FROM_CALLBACK; key++) + cache.invoke(new QueryTestKey(key), new IncrementTestEntryProcessor()); + } + } + } + else { + evtsFromCb.add(new T2<>(e.getKey(), e.getValue())); + + cbCntr.incrementAndGet(); + } + } + } + } + + /** + * @param cacheMode Cache mode. + * @param backups Number of backups. + * @param atomicityMode Cache atomicity mode. + * @param writeMode Write sync mode. + * @return Cache configuration. + */ + protected CacheConfiguration<Object, Object> cacheConfiguration( + CacheMode cacheMode, + int backups, + CacheAtomicityMode atomicityMode, + CacheWriteSynchronizationMode writeMode) { + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); + + ccfg.setName("test-cache-" + atomicityMode + "-" + cacheMode + "-" + writeMode + "-" + backups); + ccfg.setAtomicityMode(atomicityMode); + ccfg.setCacheMode(cacheMode); + ccfg.setWriteSynchronizationMode(writeMode); + ccfg.setAtomicWriteOrderMode(PRIMARY); + + if (cacheMode == PARTITIONED) + ccfg.setBackups(backups); + + return ccfg; + } + + /** + * + */ + public static class QueryTestKey implements Serializable, Comparable { + /** */ + private final Integer key; + + /** + * @param key Key. + */ + public QueryTestKey(Integer key) { + this.key = key; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + QueryTestKey that = (QueryTestKey)o; + + return key.equals(that.key); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return key.hashCode(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(QueryTestKey.class, this); + } + + /** {@inheritDoc} */ + @Override public int compareTo(Object o) { + return key - ((QueryTestKey)o).key; + } + } + + /** + * + */ + public static class QueryTestValue implements Serializable { + /** */ + @GridToStringInclude + protected final Integer val1; + + /** */ + @GridToStringInclude + protected final String val2; + + /** + * @param val Value. + */ + public QueryTestValue(Integer val) { + this.val1 = val; + this.val2 = String.valueOf(val); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + QueryTestValue that = (QueryTestValue) o; + + return val1.equals(that.val1) && val2.equals(that.val2); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = val1.hashCode(); + + res = 31 * res + val2.hashCode(); + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(QueryTestValue.class, this); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java new file mode 100644 index 0000000..7d975f2 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOrderingEventTest.java @@ -0,0 +1,722 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.configuration.FactoryBuilder; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryUpdatedListener; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheEntryEventSerializableFilter; +import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.cache.CacheMemoryMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.PA; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteAsyncCallback; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.spi.eventstorage.memory.MemoryEventStorageSpi; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; +import org.apache.ignite.transactions.Transaction; + +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED; +import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES; +import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; + +/** + * + */ +public class CacheContinuousQueryOrderingEventTest extends GridCommonAbstractTest { + /** */ + public static final int LISTENER_CNT = 3; + + /** */ + public static final int KEYS = 10; + + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int NODES = 5; + + /** */ + public static final int ITERATION_CNT = 100; + + /** */ + private boolean client; + + /** */ + private static volatile boolean fail; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); + + cfg.setClientMode(client); + + MemoryEventStorageSpi storeSpi = new MemoryEventStorageSpi(); + storeSpi.setExpireCount(100); + + cfg.setEventStorageSpi(storeSpi); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(NODES - 1); + + client = true; + + startGrid(NODES - 1); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + fail = false; + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOnheapTwoBackup() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, ATOMIC, + ONHEAP_TIERED, PRIMARY_SYNC); + + doOrderingTest(ccfg, false); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOffheapTwoBackup() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, ATOMIC, + OFFHEAP_TIERED, PRIMARY_SYNC); + + doOrderingTest(ccfg, false); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOffheapValuesTwoBackup() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, ATOMIC, + OFFHEAP_VALUES, PRIMARY_SYNC); + + doOrderingTest(ccfg, false); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicReplicatedOffheap() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0, ATOMIC, + OFFHEAP_TIERED, PRIMARY_SYNC); + + doOrderingTest(ccfg, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxOnheapTwoBackup() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, + ONHEAP_TIERED, FULL_SYNC); + + doOrderingTest(ccfg, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxOnheapWithoutBackup() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 0, TRANSACTIONAL, + ONHEAP_TIERED, PRIMARY_SYNC); + + doOrderingTest(ccfg, false); + } + + /** + * @throws Exception If failed. + */ + public void testTxOnheapWithoutBackupFullSync() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 0, TRANSACTIONAL, + ONHEAP_TIERED, FULL_SYNC); + + doOrderingTest(ccfg, false); + } + + // ASYNC + + /** + * @throws Exception If failed. + */ + public void testAtomicOnheapTwoBackupAsync() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, ATOMIC, + ONHEAP_TIERED, PRIMARY_SYNC); + + doOrderingTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOnheapTwoBackupAsyncFullSync() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, ATOMIC, + ONHEAP_TIERED, FULL_SYNC); + + doOrderingTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOffheapTwoBackupAsync() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, ATOMIC, + OFFHEAP_TIERED, PRIMARY_SYNC); + + doOrderingTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOffheapTwoBackupAsyncFullSync() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, ATOMIC, + OFFHEAP_TIERED, FULL_SYNC); + + doOrderingTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOffheapValuesTwoBackupAsync() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, ATOMIC, + OFFHEAP_VALUES, PRIMARY_SYNC); + + doOrderingTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOffheapValuesTwoBackupAsyncFullSync() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, ATOMIC, + OFFHEAP_VALUES, FULL_SYNC); + + doOrderingTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicReplicatedAsync() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0, ATOMIC, + ONHEAP_TIERED, PRIMARY_SYNC); + + doOrderingTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicReplicatedAsyncFullSync() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0, ATOMIC, + ONHEAP_TIERED, FULL_SYNC); + + doOrderingTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicReplicatedOffheapAsync() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, 0, ATOMIC, + OFFHEAP_TIERED, PRIMARY_SYNC); + + doOrderingTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOnheapWithoutBackupAsync() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 0, ATOMIC, + ONHEAP_TIERED, PRIMARY_SYNC); + + doOrderingTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testTxOnheapTwoBackupAsync() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 2, TRANSACTIONAL, + ONHEAP_TIERED, PRIMARY_SYNC); + + doOrderingTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testTxOnheapAsync() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 0, TRANSACTIONAL, + ONHEAP_TIERED, PRIMARY_SYNC); + + doOrderingTest(ccfg, true); + } + + /** + * @throws Exception If failed. + */ + public void testTxOnheapAsyncFullSync() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, 0, TRANSACTIONAL, + ONHEAP_TIERED, FULL_SYNC); + + doOrderingTest(ccfg, true); + } + + /** + * @param ccfg Cache configuration. + * @param async Async filter. + * @throws Exception If failed. + */ + protected void doOrderingTest( + final CacheConfiguration ccfg, + final boolean async) + throws Exception { + ignite(0).createCache(ccfg); + + List<QueryCursor<?>> qries = new ArrayList<>(); + + try { + List<BlockingQueue<CacheEntryEvent<QueryTestKey, QueryTestValue>>> rcvdEvts = + new ArrayList<>(LISTENER_CNT * NODES); + + final AtomicInteger qryCntr = new AtomicInteger(0); + + final int threadCnt = 20; + + for (int idx = 0; idx < NODES; idx++) { + for (int i = 0; i < LISTENER_CNT; i++) { + BlockingQueue<CacheEntryEvent<QueryTestKey, QueryTestValue>> queue = + new ArrayBlockingQueue<>(ITERATION_CNT * threadCnt); + + ContinuousQuery qry = new ContinuousQuery(); + + if (async) { + qry.setLocalListener(new TestCacheAsyncEventListener(queue, qryCntr)); + + qry.setRemoteFilterFactory(FactoryBuilder.factoryOf( + new CacheTestRemoteFilterAsync(ccfg.getName()))); + } + else { + qry.setLocalListener(new TestCacheEventListener(queue, qryCntr)); + + qry.setRemoteFilterFactory(FactoryBuilder.factoryOf( + new CacheTestRemoteFilter(ccfg.getName()))); + } + + rcvdEvts.add(queue); + + IgniteCache<Object, Object> cache = grid(idx).cache(ccfg.getName()); + + QueryCursor qryCursor = cache.query(qry); + + qries.add(qryCursor); + } + } + + IgniteInternalFuture<Long> f = GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + for (int i = 0; i < ITERATION_CNT; i++) { + IgniteCache<QueryTestKey, QueryTestValue> cache = + grid(rnd.nextInt(NODES)).cache(ccfg.getName()); + + QueryTestKey key = new QueryTestKey(rnd.nextInt(KEYS)); + + boolean startTx = cache.getConfiguration(CacheConfiguration.class).getAtomicityMode() == + TRANSACTIONAL && rnd.nextBoolean(); + + Transaction tx = null; + + if (startTx) + tx = cache.unwrap(Ignite.class).transactions().txStart(); + + try { + if ((cache.get(key) == null) || rnd.nextBoolean()) { + cache.invoke(key, new CacheEntryProcessor<QueryTestKey, QueryTestValue, Object>() { + @Override public Object process( + MutableEntry<QueryTestKey, QueryTestValue> entry, + Object... arguments) + throws EntryProcessorException { + if (entry.exists()) + entry.setValue(new QueryTestValue(entry.getValue().val1 + 1)); + else + entry.setValue(new QueryTestValue(0)); + + return null; + } + }); + } + else { + QueryTestValue val; + QueryTestValue newVal; + + do { + val = cache.get(key); + + newVal = val == null ? + new QueryTestValue(0) : new QueryTestValue(val.val1 + 1); + } + while (!cache.replace(key, val, newVal)); + } + } + finally { + if (tx != null) + tx.commit(); + } + } + } + }, threadCnt, "put-thread"); + + f.get(15, TimeUnit.SECONDS); + + GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return qryCntr.get() >= ITERATION_CNT * threadCnt * LISTENER_CNT * NODES; + } + }, 1000L); + + for (BlockingQueue<CacheEntryEvent<QueryTestKey, QueryTestValue>> queue : rcvdEvts) + checkEvents(queue, ITERATION_CNT * threadCnt); + + assertFalse("Ordering invocations of filter broken.", fail); + } + finally { + for (QueryCursor<?> qry : qries) + qry.close(); + + ignite(0).destroyCache(ccfg.getName()); + } + } + + /** + * @param queue Event queue. + * @throws Exception If failed. + */ + private void checkEvents(BlockingQueue<CacheEntryEvent<QueryTestKey, QueryTestValue>> queue, int expCnt) + throws Exception { + CacheEntryEvent<QueryTestKey, QueryTestValue> evt; + int cnt = 0; + Map<QueryTestKey, Integer> vals = new HashMap<>(); + + while ((evt = queue.poll(100, TimeUnit.MILLISECONDS)) != null) { + assertNotNull(evt); + assertNotNull(evt.getKey()); + + Integer preVal = vals.get(evt.getKey()); + + if (preVal == null) + assertEquals(new QueryTestValue(0), evt.getValue()); + else { + if (!new QueryTestValue(preVal + 1).equals(evt.getValue())) + assertEquals("Key event: " + evt.getKey(), new QueryTestValue(preVal + 1), evt.getValue()); + } + + vals.put(evt.getKey(), evt.getValue().val1); + + ++cnt; + } + + assertEquals(expCnt, cnt); + } + + /** {@inheritDoc} */ + @Override protected long getTestTimeout() { + return TimeUnit.MINUTES.toMillis(8); + } + + /** + * + */ + @IgniteAsyncCallback + private static class CacheTestRemoteFilterAsync extends CacheTestRemoteFilter { + /** + * @param cacheName Cache name. + */ + public CacheTestRemoteFilterAsync(String cacheName) { + super(cacheName); + } + } + + /** + * + */ + private static class CacheTestRemoteFilter implements + CacheEntryEventSerializableFilter<QueryTestKey, QueryTestValue> { + /** */ + private Map<QueryTestKey, QueryTestValue> prevVals = new ConcurrentHashMap<>(); + + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + private String cacheName; + + /** + * @param cacheName Cache name. + */ + public CacheTestRemoteFilter(String cacheName) { + this.cacheName = cacheName; + } + + /** {@inheritDoc} */ + @Override public boolean evaluate(CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e) { + if (affinity(ignite.cache(cacheName)).isPrimary(ignite.cluster().localNode(), e.getKey())) { + QueryTestValue prevVal = prevVals.put(e.getKey(), e.getValue()); + + if (prevVal != null) { + if (!new QueryTestValue(prevVal.val1 + 1).equals(e.getValue())) + fail = true; + } + } + + return true; + } + } + + /** + * + */ + @IgniteAsyncCallback + private static class TestCacheAsyncEventListener extends TestCacheEventListener { + /** + * @param queue Queue. + * @param cntr Received events counter. + */ + public TestCacheAsyncEventListener(BlockingQueue<CacheEntryEvent<QueryTestKey, QueryTestValue>> queue, + AtomicInteger cntr) { + super(queue, cntr); + } + } + + /** + * + */ + private static class TestCacheEventListener implements CacheEntryUpdatedListener<QueryTestKey, QueryTestValue> { + /** */ + private final BlockingQueue<CacheEntryEvent<QueryTestKey, QueryTestValue>> queue; + + /** */ + private final AtomicInteger cntr; + + /** + * @param queue Queue. + * @param cntr Received events counter. + */ + public TestCacheEventListener(BlockingQueue<CacheEntryEvent<QueryTestKey, QueryTestValue>> queue, + AtomicInteger cntr) { + this.queue = queue; + this.cntr = cntr; + } + + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey, + ? extends QueryTestValue>> evts) { + for (CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue> e : evts) { + queue.add((CacheEntryEvent<QueryTestKey, QueryTestValue>)e); + + cntr.incrementAndGet(); + } + } + } + + /** + * @param cacheMode Cache mode. + * @param backups Number of backups. + * @param atomicityMode Cache atomicity mode. + * @param memoryMode Cache memory mode. + * @param writeMode Cache write mode. + * @return Cache configuration. + */ + protected CacheConfiguration<Object, Object> cacheConfiguration( + CacheMode cacheMode, + int backups, + CacheAtomicityMode atomicityMode, + CacheMemoryMode memoryMode, + CacheWriteSynchronizationMode writeMode) { + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); + + ccfg.setName("test-cache-" + atomicityMode + "-" + cacheMode + "-" + memoryMode + "-" + memoryMode + "-" + + backups); + ccfg.setAtomicityMode(atomicityMode); + ccfg.setCacheMode(cacheMode); + ccfg.setMemoryMode(memoryMode); + ccfg.setWriteSynchronizationMode(writeMode); + ccfg.setAtomicWriteOrderMode(PRIMARY); + + if (cacheMode == PARTITIONED) + ccfg.setBackups(backups); + + return ccfg; + } + + /** + * + */ + public static class QueryTestKey implements Serializable, Comparable { + /** */ + private final Integer key; + + /** + * @param key Key. + */ + public QueryTestKey(Integer key) { + this.key = key; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + QueryTestKey that = (QueryTestKey)o; + + return key.equals(that.key); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return key.hashCode(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(QueryTestKey.class, this); + } + + /** {@inheritDoc} */ + @Override public int compareTo(Object o) { + return key - ((QueryTestKey)o).key; + } + } + + /** + * + */ + public static class QueryTestValue implements Serializable { + /** */ + @GridToStringInclude + protected final Integer val1; + + /** */ + @GridToStringInclude + protected final String val2; + + /** + * @param val Value. + */ + public QueryTestValue(Integer val) { + this.val1 = val; + this.val2 = String.valueOf(val); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + QueryTestValue that = (QueryTestValue) o; + + return val1.equals(that.val1) && val2.equals(that.val2); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = val1.hashCode(); + + res = 31 * res + val2.hashCode(); + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(QueryTestValue.class, this); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java index e9fbf70..a6c33bb 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java @@ -62,6 +62,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.PA; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; @@ -118,6 +119,7 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract IgniteConfiguration cfg = super.getConfiguration(gridName); ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + ((TcpCommunicationSpi)cfg.getCommunicationSpi()).setSharedMemoryPort(-1); cfg.setClientMode(client); @@ -598,6 +600,9 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract final List<CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> evts = new CopyOnWriteArrayList<>(); + if (noOpFilterFactory() != null) + qry.setRemoteFilterFactory(noOpFilterFactory()); + qry.setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>() { @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> events) throws CacheEntryListenerException { @@ -684,9 +689,17 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract checkSingleEvent(evts.get(7), CREATED, new QueryTestValue(5), null); checkSingleEvent(evts.get(8), EventType.UPDATED, new QueryTestValue(6), new QueryTestValue(5)); + evts.clear(); + cache.remove(key); cache.remove(key); + assert GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return evts.size() == 1; + } + }, 5_000); + evts.clear(); log.info("Finish iteration: " + i); @@ -699,6 +712,13 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract } /** + * @return No-op filter factory for batch operations. + */ + protected Factory<? extends CacheEntryEventFilter<QueryTestKey, QueryTestValue>> noOpFilterFactory() { + return null; + } + + /** * @param ccfg Cache configuration. * @throws Exception If failed. */ @@ -711,6 +731,9 @@ public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstract final List<CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> evts = new CopyOnWriteArrayList<>(); + if (noOpFilterFactory() != null) + qry.setRemoteFilterFactory(noOpFilterFactory()); + qry.setLocalListener(new CacheEntryUpdatedListener<QueryTestKey, QueryTestValue>() { @Override public void onUpdated(Iterable<CacheEntryEvent<? extends QueryTestKey, ? extends QueryTestValue>> events) throws CacheEntryListenerException { http://git-wip-us.apache.org/repos/asf/ignite/blob/9df46fc0/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java index f318c38..1ad52d3 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java @@ -41,18 +41,19 @@ public class GridTestKernalContext extends GridKernalContextImpl { */ public GridTestKernalContext(IgniteLogger log) throws IgniteCheckedException { super(new GridLoggerProxy(log, null, null, null), - new IgniteKernal(null), - new IgniteConfiguration(), - new GridKernalGatewayImpl(null), - null, - null, - null, - null, - null, - null, - null, - null, - U.allPluginProviders()); + new IgniteKernal(null), + new IgniteConfiguration(), + new GridKernalGatewayImpl(null), + null, + null, + null, + null, + null, + null, + null, + null, + null, + U.allPluginProviders()); GridTestUtils.setFieldValue(grid(), "cfg", config());
