http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java index 27edb0c..e6bfd87 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAbstractTest.java @@ -21,6 +21,7 @@ import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; +import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -56,6 +57,7 @@ import javax.cache.processor.MutableEntry; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; import org.apache.ignite.cache.CacheEntryEventSerializableFilter; +import org.apache.ignite.cache.CacheMemoryMode; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor; @@ -70,6 +72,7 @@ import static javax.cache.event.EventType.CREATED; import static javax.cache.event.EventType.EXPIRED; import static javax.cache.event.EventType.REMOVED; import static javax.cache.event.EventType.UPDATED; +import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED; import static org.apache.ignite.cache.CacheMode.LOCAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheMode.REPLICATED; @@ -79,7 +82,7 @@ import static org.apache.ignite.cache.CacheMode.REPLICATED; */ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAbstractTest { /** */ - private static volatile List<CacheEntryEvent<? extends Integer, ? extends Integer>> evts; + private static volatile List<CacheEntryEvent<?, ?>> evts; /** */ private static volatile CountDownLatch evtsLatch; @@ -91,7 +94,10 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb private Integer lastKey = 0; /** */ - private CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg; + private CacheEntryListenerConfiguration<Object, Object> lsnrCfg; + + /** */ + private boolean useObjects; /** {@inheritDoc} */ @SuppressWarnings("unchecked") @@ -103,9 +109,18 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb cfg.setEagerTtl(eagerTtl()); + cfg.setMemoryMode(memoryMode()); + return cfg; } + /** + * @return Cache memory mode. + */ + protected CacheMemoryMode memoryMode() { + return ONHEAP_TIERED; + } + /** {@inheritDoc} */ @Override protected void afterTest() throws Exception { super.afterTest(); @@ -129,9 +144,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb * @throws Exception If failed. */ public void testExceptionIgnored() throws Exception { - CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration<>( - new Factory<CacheEntryListener<Integer, Integer>>() { - @Override public CacheEntryListener<Integer, Integer> create() { + CacheEntryListenerConfiguration<Object, Object> lsnrCfg = new MutableCacheEntryListenerConfiguration<>( + new Factory<CacheEntryListener<Object, Object>>() { + @Override public CacheEntryListener<Object, Object> create() { return new ExceptionListener(); } }, @@ -140,7 +155,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb false ); - IgniteCache<Integer, Integer> cache = jcache(); + IgniteCache<Object, Object> cache = jcache(); cache.registerCacheEntryListener(lsnrCfg); @@ -158,13 +173,13 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb } lsnrCfg = new MutableCacheEntryListenerConfiguration<>( - new Factory<CacheEntryListener<Integer, Integer>>() { - @Override public CacheEntryListener<Integer, Integer> create() { + new Factory<CacheEntryListener<Object, Object>>() { + @Override public CacheEntryListener<Object, Object> create() { return new CreateUpdateRemoveExpireListener(); } }, - new Factory<CacheEntryEventSerializableFilter<? super Integer, ? super Integer>>() { - @Override public CacheEntryEventSerializableFilter<? super Integer, ? super Integer> create() { + new Factory<CacheEntryEventSerializableFilter<Object, Object>>() { + @Override public CacheEntryEventSerializableFilter<Object, Object> create() { return new ExceptionFilter(); } }, @@ -192,9 +207,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb * @throws Exception If failed. */ public void testNoOldValue() throws Exception { - CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration<>( - new Factory<CacheEntryListener<Integer, Integer>>() { - @Override public CacheEntryListener<Integer, Integer> create() { + CacheEntryListenerConfiguration<Object, Object> lsnrCfg = new MutableCacheEntryListenerConfiguration<>( + new Factory<CacheEntryListener<Object, Object>>() { + @Override public CacheEntryListener<Object, Object> create() { return new CreateUpdateRemoveExpireListener(); } }, @@ -203,7 +218,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb true ); - IgniteCache<Integer, Integer> cache = jcache(); + IgniteCache<Object, Object> cache = jcache(); try { for (Integer key : keys()) { @@ -222,21 +237,30 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb /** * @throws Exception If failed. */ + public void testSynchronousEventsObjectKeyValue() throws Exception { + useObjects = true; + + testSynchronousEvents(); + } + + /** + * @throws Exception If failed. + */ public void testSynchronousEvents() throws Exception { - final CacheEntryCreatedListener<Integer, Integer> lsnr = new CreateUpdateRemoveExpireListener() { - @Override public void onRemoved(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + final CacheEntryCreatedListener<Object, Object> lsnr = new CreateUpdateRemoveExpireListener() { + @Override public void onRemoved(Iterable<CacheEntryEvent<?, ?>> evts) { super.onRemoved(evts); awaitLatch(); } - @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + @Override public void onCreated(Iterable<CacheEntryEvent<?, ?>> evts) { super.onCreated(evts); awaitLatch(); } - @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { super.onUpdated(evts); awaitLatch(); @@ -252,9 +276,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb } }; - CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration<>( - new Factory<CacheEntryListener<Integer, Integer>>() { - @Override public CacheEntryListener<Integer, Integer> create() { + CacheEntryListenerConfiguration<Object, Object> lsnrCfg = new MutableCacheEntryListenerConfiguration<>( + new Factory<CacheEntryListener<Object, Object>>() { + @Override public CacheEntryListener<Object, Object> create() { return lsnr; } }, @@ -263,7 +287,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb true ); - IgniteCache<Integer, Integer> cache = jcache(); + IgniteCache<Object, Object> cache = jcache(); cache.registerCacheEntryListener(lsnrCfg); @@ -299,7 +323,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb if (!eagerTtl()) { U.sleep(1100); - assertNull(primaryCache(key, cache.getName()).get(key)); + assertNull(primaryCache(key, cache.getName()).get(key(key))); evtsLatch.await(5000, MILLISECONDS); @@ -378,13 +402,13 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb final CyclicBarrier barrier = new CyclicBarrier(THREADS); - final IgniteCache<Integer, Integer> cache = jcache(0); + final IgniteCache<Object, Object> cache = jcache(0); GridTestUtils.runMultiThreadedAsync(new Callable<Void>() { @Override public Void call() throws Exception { - CacheEntryListenerConfiguration<Integer, Integer> cfg = new MutableCacheEntryListenerConfiguration<>( - new Factory<CacheEntryListener<Integer, Integer>>() { - @Override public CacheEntryListener<Integer, Integer> create() { + CacheEntryListenerConfiguration<Object, Object> cfg = new MutableCacheEntryListenerConfiguration<>( + new Factory<CacheEntryListener<Object, Object>>() { + @Override public CacheEntryListener<Object, Object> create() { return new CreateUpdateRemoveExpireListener(); } }, @@ -441,9 +465,13 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb * @param expEvts Expected events number. * @throws Exception If failed. */ - private void syncEvent(Integer key, Integer val, IgniteCache<Integer, Integer> cache, int expEvts) + private void syncEvent( + Integer key, + Integer val, + IgniteCache<Object, Object> cache, + int expEvts) throws Exception { - evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<? extends Integer, ? extends Integer>>()); + evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<?, ?>>()); evtsLatch = new CountDownLatch(expEvts); @@ -466,9 +494,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb }); if (val != null) - cache.put(key, val); + cache.put(key(key), value(val)); else - cache.remove(key); + cache.remove(key(key)); done.set(true); @@ -480,15 +508,45 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb } /** + * @param key Integer key. + * @return Key instance. + */ + private Object key(Integer key) { + assert key != null; + + return useObjects ? new ListenerTestKey(key) : key; + } + + /** + * @param val Integer value. + * @return Value instance. + */ + private Object value(Integer val) { + if (val == null) + return null; + + return useObjects ? new ListenerTestValue(val) : val; + } + + /** + * @throws Exception If failed. + */ + public void testEventsObjectKeyValue() throws Exception { + useObjects = true; + + testEvents(); + } + + /** * @throws Exception If failed. */ public void testEvents() throws Exception { - IgniteCache<Integer, Integer> cache = jcache(); + IgniteCache<Object, Object> cache = jcache(); - Map<Integer, Integer> vals = new HashMap<>(); + Map<Object, Object> vals = new HashMap<>(); for (int i = 0; i < 100; i++) - vals.put(i + 2_000_000, i); + vals.put(key(i + 2_000_000), value(i)); cache.putAll(vals); // Put some data in cache to make sure events are not generated for existing entries. @@ -518,7 +576,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb checkEvents(cache, new CreateUpdateRemoveExpireListenerFactory(), key, true, true, true, true); } - CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration<>( + CacheEntryListenerConfiguration<Object, Object> lsnrCfg = new MutableCacheEntryListenerConfiguration<>( new CreateUpdateRemoveExpireListenerFactory(), new TestFilterFactory(), true, @@ -551,7 +609,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb * @throws Exception If failed. */ @SuppressWarnings("unchecked") - private void checkListenerOnStart(Map<Integer, Integer> vals) throws Exception { + private void checkListenerOnStart(Map<Object, Object> vals) throws Exception { lsnrCfg = new MutableCacheEntryListenerConfiguration<>( new CreateUpdateRemoveExpireListenerFactory(), null, @@ -564,7 +622,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb try { awaitPartitionMapExchange(); - IgniteCache<Integer, Integer> cache = grid.cache(null); + IgniteCache<Object, Object> cache = grid.cache(null); Integer key = Integer.MAX_VALUE; @@ -588,7 +646,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb try { awaitPartitionMapExchange(); - IgniteCache<Integer, Integer> cache = grid.cache(null); + IgniteCache<Object, Object> cache = grid.cache(null); log.info("Check filter for listener in configuration."); @@ -613,14 +671,14 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb * @throws Exception If failed. */ private void checkEvents( - final IgniteCache<Integer, Integer> cache, - final Factory<CacheEntryListener<Integer, Integer>> lsnrFactory, + final IgniteCache<Object, Object> cache, + final Factory<CacheEntryListener<Object, Object>> lsnrFactory, Integer key, boolean create, boolean update, boolean rmv, boolean expire) throws Exception { - CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg = new MutableCacheEntryListenerConfiguration<>( + CacheEntryListenerConfiguration<Object, Object> lsnrCfg = new MutableCacheEntryListenerConfiguration<>( lsnrFactory, null, true, @@ -642,8 +700,8 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb * @param vals Values in cache. * @throws Exception If failed. */ - private void checkFilter(final IgniteCache<Integer, Integer> cache, Map<Integer, Integer> vals) throws Exception { - evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<? extends Integer, ? extends Integer>>()); + private void checkFilter(final IgniteCache<Object, Object> cache, Map<Object, Object> vals) throws Exception { + evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<?, ?>>()); final int expEvts = (vals.size() / 2) * 4; // Remove, create, update and expire for half of modified entries. @@ -653,16 +711,18 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb cache.putAll(vals); - final Map<Integer, Integer> newVals = new HashMap<>(); + final Map<Object, Object> newVals = new HashMap<>(); - for (Integer key : vals.keySet()) - newVals.put(key, -1); + for (Object key : vals.keySet()) + newVals.put(key, value(-1)); cache.withExpiryPolicy(new ModifiedExpiryPolicy(new Duration(MILLISECONDS, 500))).putAll(newVals); + U.sleep(1000); + GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { - for (Integer key : newVals.keySet()) { + for (Object key : newVals.keySet()) { if (primaryCache(key, cache.getName()).get(key) != null) return false; } @@ -675,13 +735,20 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb assertEquals(expEvts, evts.size()); - Set<Integer> rmvd = new HashSet<>(); - Set<Integer> created = new HashSet<>(); - Set<Integer> updated = new HashSet<>(); - Set<Integer> expired = new HashSet<>(); + Set<Object> rmvd = new HashSet<>(); + Set<Object> created = new HashSet<>(); + Set<Object> updated = new HashSet<>(); + Set<Object> expired = new HashSet<>(); + + for (CacheEntryEvent<?, ?> evt : evts) { + Integer key; + + if (useObjects) + key = ((ListenerTestKey)evt.getKey()).key; + else + key = (Integer)evt.getKey(); - for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) { - assertTrue(evt.getKey() % 2 == 0); + assertTrue(key % 2 == 0); assertTrue(vals.keySet().contains(evt.getKey())); @@ -707,7 +774,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb break; case UPDATED: - assertEquals(-1, (int)evt.getValue()); + assertEquals(value(-1), evt.getValue()); assertEquals(vals.get(evt.getKey()), evt.getOldValue()); @@ -722,7 +789,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb case EXPIRED: assertNull(evt.getValue()); - assertEquals(-1, (int)evt.getOldValue()); + assertEquals(value(-1), evt.getOldValue()); assertTrue(rmvd.contains(evt.getKey())); @@ -757,8 +824,8 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb * @throws Exception If failed. */ private void checkEvents( - final IgniteCache<Integer, Integer> cache, - final CacheEntryListenerConfiguration<Integer, Integer> lsnrCfg, + final IgniteCache<Object, Object> cache, + final CacheEntryListenerConfiguration<Object, Object> lsnrCfg, Integer key, boolean create, boolean update, @@ -789,64 +856,64 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb if (expire) expEvts += 2; - evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<? extends Integer, ? extends Integer>>()); + evts = Collections.synchronizedList(new ArrayList<CacheEntryEvent<?, ?>>()); evtsLatch = new CountDownLatch(expEvts); - cache.put(key, 0); + cache.put(key(key), value(0)); for (int i = 0; i < UPDATES; i++) { if (i % 2 == 0) - cache.put(key, i + 1); + cache.put(key(key), value(i + 1)); else - cache.invoke(key, new EntrySetValueProcessor(i + 1)); + cache.invoke(key(key), new EntrySetValueProcessor(value(i + 1))); } // Invoke processor does not update value, should not trigger event. - assertEquals(String.valueOf(UPDATES), cache.invoke(key, new EntryToStringProcessor())); + assertEquals(String.valueOf(UPDATES), cache.invoke(key(key), new EntryToStringProcessor())); - assertFalse(cache.putIfAbsent(key, -1)); + assertFalse(cache.putIfAbsent(key(key), value(-1))); - assertFalse(cache.remove(key, -1)); + assertFalse(cache.remove(key(key), value(-1))); - assertTrue(cache.remove(key)); + assertTrue(cache.remove(key(key))); - IgniteCache<Integer, Integer> expirePlcCache = + IgniteCache<Object, Object> expirePlcCache = cache.withExpiryPolicy(new CreatedExpiryPolicy(new Duration(MILLISECONDS, 100))); - expirePlcCache.put(key, 10); + expirePlcCache.put(key(key), value(10)); U.sleep(700); if (!eagerTtl()) - assertNull(primaryCache(key, cache.getName()).get(key)); // Provoke expire event if eager ttl is disabled. + assertNull(primaryCache(key, cache.getName()).get(key(key))); // Provoke expire event if eager ttl is disabled. - IgniteCache<Integer, Integer> cache1 = cache; + IgniteCache<Object, Object> cache1 = cache; if (gridCount() > 1) cache1 = jcache(1); // Do updates from another node. - cache1.put(key, 1); + cache1.put(key(key), value(1)); - cache1.put(key, 2); + cache1.put(key(key), value(2)); - assertTrue(cache1.remove(key)); + assertTrue(cache1.remove(key(key))); - IgniteCache<Integer, Integer> expirePlcCache1 = + IgniteCache<Object, Object> expirePlcCache1 = cache1.withExpiryPolicy(new CreatedExpiryPolicy(new Duration(MILLISECONDS, 100))); - expirePlcCache1.put(key, 20); + expirePlcCache1.put(key(key), value(20)); U.sleep(200); if (!eagerTtl()) - assertNull(primaryCache(key, cache.getName()).get(key)); // Provoke expire event if eager ttl is disabled. + assertNull(primaryCache(key, cache.getName()).get(key(key))); // Provoke expire event if eager ttl is disabled. evtsLatch.await(5000, MILLISECONDS); assertEquals(expEvts, evts.size()); - Iterator<CacheEntryEvent<? extends Integer, ? extends Integer>> iter = evts.iterator(); + Iterator<CacheEntryEvent<?, ?>> iter = evts.iterator(); if (create) checkEvent(iter, key, CREATED, 0, null); @@ -886,11 +953,11 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb cache.deregisterCacheEntryListener(lsnrCfg); - cache.put(key, 1); + cache.put(key(key), value(1)); - cache.put(key, 2); + cache.put(key(key), value(2)); - assertTrue(cache.remove(key)); + assertTrue(cache.remove(key(key))); U.sleep(500); // Sleep some time to ensure listener was really removed. @@ -908,26 +975,26 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb * @param expVal Expected value. * @param expOld Expected old value. */ - private void checkEvent(Iterator<CacheEntryEvent<? extends Integer, ? extends Integer>> iter, + private void checkEvent(Iterator<CacheEntryEvent<?, ?>> iter, Integer expKey, EventType expType, @Nullable Integer expVal, @Nullable Integer expOld) { assertTrue(iter.hasNext()); - CacheEntryEvent<? extends Integer, ? extends Integer> evt = iter.next(); + CacheEntryEvent<?, ?> evt = iter.next(); iter.remove(); assertTrue(evt.getSource() instanceof IgniteCacheProxy); - assertEquals(expKey, evt.getKey()); + assertEquals(key(expKey), evt.getKey()); assertEquals(expType, evt.getEventType()); - assertEquals(expVal, evt.getValue()); + assertEquals(value(expVal), evt.getValue()); - assertEquals(expOld, evt.getOldValue()); + assertEquals(value(expOld), evt.getOldValue()); if (expOld == null) assertFalse(evt.isOldValueAvailable()); @@ -977,7 +1044,7 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb /** * @param evt Event. */ - private static void onEvent(CacheEntryEvent<? extends Integer, ? extends Integer> evt) { + private static void onEvent(CacheEntryEvent<?, ?> evt) { // System.out.println("Received event [evt=" + evt + ", thread=" + Thread.currentThread().getName() + ']'); assertNotNull(evt); @@ -993,9 +1060,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb /** * */ - private static class CreateUpdateRemoveExpireListenerFactory implements Factory<CacheEntryListener<Integer, Integer>> { + private static class CreateUpdateRemoveExpireListenerFactory implements Factory<CacheEntryListener<Object, Object>> { /** {@inheritDoc} */ - @Override public CacheEntryListener<Integer, Integer> create() { + @Override public CacheEntryListener<Object, Object> create() { return new CreateUpdateRemoveExpireListener(); } } @@ -1003,9 +1070,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb /** * */ - private static class NoOpCreateUpdateListenerFactory implements Factory<CacheEntryListener<Integer, Integer>> { + private static class NoOpCreateUpdateListenerFactory implements Factory<CacheEntryListener<Object, Object>> { /** {@inheritDoc} */ - @Override public CacheEntryListener<Integer, Integer> create() { + @Override public CacheEntryListener<Object, Object> create() { return new NoOpCreateUpdateListener(); } } @@ -1013,9 +1080,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb /** * */ - private static class CreateUpdateListenerFactory implements Factory<CacheEntryListener<Integer, Integer>> { + private static class CreateUpdateListenerFactory implements Factory<CacheEntryListener<Object, Object>> { /** {@inheritDoc} */ - @Override public CacheEntryListener<Integer, Integer> create() { + @Override public CacheEntryListener<Object, Object> create() { return new CreateUpdateListener(); } } @@ -1023,9 +1090,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb /** * */ - private static class CreateListenerFactory implements Factory<CacheEntryListener<Integer, Integer>> { + private static class CreateListenerFactory implements Factory<CacheEntryListener<Object, Object>> { /** {@inheritDoc} */ - @Override public CacheEntryListener<Integer, Integer> create() { + @Override public CacheEntryListener<Object, Object> create() { return new CreateListener(); } } @@ -1033,9 +1100,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb /** * */ - private static class RemoveListenerFactory implements Factory<CacheEntryListener<Integer, Integer>> { + private static class RemoveListenerFactory implements Factory<CacheEntryListener<Object, Object>> { /** {@inheritDoc} */ - @Override public CacheEntryListener<Integer, Integer> create() { + @Override public CacheEntryListener<Object, Object> create() { return new RemoveListener(); } } @@ -1043,9 +1110,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb /** * */ - private static class UpdateListenerFactory implements Factory<CacheEntryListener<Integer, Integer>> { + private static class UpdateListenerFactory implements Factory<CacheEntryListener<Object, Object>> { /** {@inheritDoc} */ - @Override public CacheEntryListener<Integer, Integer> create() { + @Override public CacheEntryListener<Object, Object> create() { return new UpdateListener(); } } @@ -1053,9 +1120,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb /** * */ - private static class ExpireListenerFactory implements Factory<CacheEntryListener<Integer, Integer>> { + private static class ExpireListenerFactory implements Factory<CacheEntryListener<Object, Object>> { /** {@inheritDoc} */ - @Override public CacheEntryListener<Integer, Integer> create() { + @Override public CacheEntryListener<Object, Object> create() { return new ExpireListener(); } } @@ -1063,9 +1130,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb /** * */ - private static class TestFilterFactory implements Factory<CacheEntryEventSerializableFilter<Integer, Integer>> { + private static class TestFilterFactory implements Factory<CacheEntryEventSerializableFilter<Object, Object>> { /** {@inheritDoc} */ - @Override public CacheEntryEventSerializableFilter<Integer, Integer> create() { + @Override public CacheEntryEventSerializableFilter<Object, Object> create() { return new TestFilter(); } } @@ -1073,10 +1140,10 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb /** * */ - private static class CreateListener implements CacheEntryCreatedListener<Integer, Integer> { + private static class CreateListener implements CacheEntryCreatedListener<Object, Object> { /** {@inheritDoc} */ - @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { - for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) + @Override public void onCreated(Iterable<CacheEntryEvent<?, ?>> evts) { + for (CacheEntryEvent<?, ?> evt : evts) onEvent(evt); } } @@ -1084,10 +1151,10 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb /** * */ - private static class UpdateListener implements CacheEntryUpdatedListener<Integer, Integer> { + private static class UpdateListener implements CacheEntryUpdatedListener<Object, Object> { /** {@inheritDoc} */ - @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { - for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) + @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { + for (CacheEntryEvent<?, ?> evt : evts) onEvent(evt); } } @@ -1095,10 +1162,10 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb /** * */ - private static class RemoveListener implements CacheEntryRemovedListener<Integer, Integer> { + private static class RemoveListener implements CacheEntryRemovedListener<Object, Object> { /** {@inheritDoc} */ - @Override public void onRemoved(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { - for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) + @Override public void onRemoved(Iterable<CacheEntryEvent<?, ?>> evts) { + for (CacheEntryEvent<?, ?> evt : evts) onEvent(evt); } } @@ -1106,10 +1173,10 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb /** * */ - private static class ExpireListener implements CacheEntryExpiredListener<Integer, Integer> { + private static class ExpireListener implements CacheEntryExpiredListener<Object, Object> { /** {@inheritDoc} */ - @Override public void onExpired(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { - for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) + @Override public void onExpired(Iterable<CacheEntryEvent<?, ?>> evts) { + for (CacheEntryEvent<?, ?> evt : evts) onEvent(evt); } } @@ -1117,32 +1184,39 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb /** * */ - private static class TestFilter implements CacheEntryEventSerializableFilter<Integer, Integer> { + private static class TestFilter implements CacheEntryEventSerializableFilter<Object, Object> { /** {@inheritDoc} */ - @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> evt) { + @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) { assert evt != null; assert evt.getSource() != null : evt; assert evt.getEventType() != null : evt; assert evt.getKey() != null : evt; - return evt.getKey() % 2 == 0; + Integer key; + + if (evt.getKey() instanceof ListenerTestKey) + key = ((ListenerTestKey)evt.getKey()).key; + else + key = (Integer)evt.getKey(); + + return key % 2 == 0; } } /** * */ - private static class CreateUpdateListener implements CacheEntryCreatedListener<Integer, Integer>, - CacheEntryUpdatedListener<Integer, Integer> { + private static class CreateUpdateListener implements CacheEntryCreatedListener<Object, Object>, + CacheEntryUpdatedListener<Object, Object> { /** {@inheritDoc} */ - @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { - for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) + @Override public void onCreated(Iterable<CacheEntryEvent<?, ?>> evts) { + for (CacheEntryEvent<?, ?> evt : evts) onEvent(evt); } /** {@inheritDoc} */ - @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { - for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) + @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { + for (CacheEntryEvent<?, ?> evt : evts) onEvent(evt); } } @@ -1150,11 +1224,11 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb /** * */ - private static class NoOpCreateUpdateListener implements CacheEntryCreatedListener<Integer, Integer>, - CacheEntryUpdatedListener<Integer, Integer> { + private static class NoOpCreateUpdateListener implements CacheEntryCreatedListener<Object, Object>, + CacheEntryUpdatedListener<Object, Object> { /** {@inheritDoc} */ - @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { - for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) { + @Override public void onCreated(Iterable<CacheEntryEvent<?, ?>> evts) { + for (CacheEntryEvent<?, ?> evt : evts) { assertNotNull(evt); assertNotNull(evt.getSource()); assertNotNull(evt.getEventType()); @@ -1163,8 +1237,8 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb } /** {@inheritDoc} */ - @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { - for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) { + @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { + for (CacheEntryEvent<?, ?> evt : evts) { assertNotNull(evt); assertNotNull(evt.getSource()); assertNotNull(evt.getEventType()); @@ -1177,16 +1251,16 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb * */ private static class CreateUpdateRemoveExpireListener extends CreateUpdateListener - implements CacheEntryRemovedListener<Integer, Integer>, CacheEntryExpiredListener<Integer, Integer> { + implements CacheEntryRemovedListener<Object, Object>, CacheEntryExpiredListener<Object, Object> { /** {@inheritDoc} */ - @Override public void onRemoved(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { - for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) + @Override public void onRemoved(Iterable<CacheEntryEvent<?, ?>> evts) { + for (CacheEntryEvent<?, ?> evt : evts) onEvent(evt); } /** {@inheritDoc} */ - @Override public void onExpired(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { - for (CacheEntryEvent<? extends Integer, ? extends Integer> evt : evts) + @Override public void onExpired(Iterable<CacheEntryEvent<?, ?>> evts) { + for (CacheEntryEvent<?, ?> evt : evts) onEvent(evt); } } @@ -1194,9 +1268,9 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb /** * */ - private static class ExceptionFilter implements CacheEntryEventSerializableFilter<Integer, Integer> { + private static class ExceptionFilter implements CacheEntryEventSerializableFilter<Object, Object> { /** {@inheritDoc} */ - @Override public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends Integer> evt) { + @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) { throw new RuntimeException("Test filter error."); } } @@ -1205,24 +1279,24 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb * */ private static class ExceptionListener extends CreateUpdateListener - implements CacheEntryRemovedListener<Integer, Integer>, CacheEntryExpiredListener<Integer, Integer> { + implements CacheEntryRemovedListener<Object, Object>, CacheEntryExpiredListener<Object, Object> { /** {@inheritDoc} */ - @Override public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + @Override public void onCreated(Iterable<CacheEntryEvent<?, ?>> evts) { error(); } /** {@inheritDoc} */ - @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { error(); } /** {@inheritDoc} */ - @Override public void onExpired(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + @Override public void onExpired(Iterable<CacheEntryEvent<?, ?>> evts) { error(); } /** {@inheritDoc} */ - @Override public void onRemoved(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + @Override public void onRemoved(Iterable<CacheEntryEvent<?, ?>> evts) { error(); } @@ -1237,10 +1311,12 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb /** * */ - protected static class EntryToStringProcessor implements EntryProcessor<Integer, Integer, String> { + protected static class EntryToStringProcessor implements EntryProcessor<Object, Object, String> { /** {@inheritDoc} */ - @Override public String process(MutableEntry<Integer, Integer> e, Object... arguments) - throws EntryProcessorException { + @Override public String process(MutableEntry<Object, Object> e, Object... args) { + if (e.getValue() instanceof ListenerTestValue) + return String.valueOf(((ListenerTestValue)e.getValue()).val1); + return String.valueOf(e.getValue()); } @@ -1253,19 +1329,19 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb /** * */ - protected static class EntrySetValueProcessor implements EntryProcessor<Integer, Integer, String> { + protected static class EntrySetValueProcessor implements EntryProcessor<Object, Object, String> { /** */ - private Integer val; + private Object val; /** * @param val Value to set. */ - public EntrySetValueProcessor(Integer val) { + public EntrySetValueProcessor(Object val) { this.val = val; } /** {@inheritDoc} */ - @Override public String process(MutableEntry<Integer, Integer> e, Object... arguments) + @Override public String process(MutableEntry<Object, Object> e, Object... args) throws EntryProcessorException { e.setValue(val); @@ -1307,4 +1383,88 @@ public abstract class IgniteCacheEntryListenerAbstractTest extends IgniteCacheAb // No-op. } } + + /** + * + */ + static class ListenerTestKey implements Serializable { + /** */ + private final Integer key; + + /** + * @param key Key. + */ + public ListenerTestKey(Integer key) { + this.key = key; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + ListenerTestKey that = (ListenerTestKey)o; + + return key.equals(that.key); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return key.hashCode(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ListenerTestKey.class, this); + } + } + + /** + * + */ + static class ListenerTestValue implements Serializable { + /** */ + private final Integer val1; + + /** */ + private final String val2; + + /** + * @param val Value. + */ + public ListenerTestValue(Integer val) { + this.val1 = val; + this.val2 = String.valueOf(val); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + ListenerTestValue that = (ListenerTestValue) o; + + return val1.equals(that.val1) && val2.equals(that.val2); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = val1.hashCode(); + + res = 31 * res + val2.hashCode(); + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ListenerTestValue.class, this); + } + } }
http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicOffheapTieredTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicOffheapTieredTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicOffheapTieredTest.java new file mode 100644 index 0000000..69efb84 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicOffheapTieredTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.CacheMemoryMode; + +import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED; + +/** + * + */ +public class IgniteCacheEntryListenerAtomicOffheapTieredTest extends IgniteCacheEntryListenerAtomicTest { + /** {@inheritDoc} */ + @Override protected CacheMemoryMode memoryMode() { + return OFFHEAP_TIERED; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicOffheapValuesTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicOffheapValuesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicOffheapValuesTest.java new file mode 100644 index 0000000..23b1bc0 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerAtomicOffheapValuesTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.CacheMemoryMode; + +import static org.apache.ignite.cache.CacheMemoryMode.*; + +/** + * + */ +public class IgniteCacheEntryListenerAtomicOffheapValuesTest extends IgniteCacheEntryListenerAtomicTest { + /** {@inheritDoc} */ + @Override protected CacheMemoryMode memoryMode() { + return OFFHEAP_VALUES; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxOffheapTieredTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxOffheapTieredTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxOffheapTieredTest.java new file mode 100644 index 0000000..d552195 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxOffheapTieredTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.CacheMemoryMode; + +import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED; + +/** + * + */ +public class IgniteCacheEntryListenerTxOffheapTieredTest extends IgniteCacheEntryListenerTxTest { + /** {@inheritDoc} */ + @Override protected CacheMemoryMode memoryMode() { + return OFFHEAP_TIERED; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxOffheapValuesTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxOffheapValuesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxOffheapValuesTest.java new file mode 100644 index 0000000..32555c8 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxOffheapValuesTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import org.apache.ignite.cache.CacheMemoryMode; + +import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES; + +/** + * + */ +public class IgniteCacheEntryListenerTxOffheapValuesTest extends IgniteCacheEntryListenerTxTest { + /** {@inheritDoc} */ + @Override protected CacheMemoryMode memoryMode() { + return OFFHEAP_VALUES; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java index a9e43d4..41725e7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheEntryListenerTxTest.java @@ -48,6 +48,7 @@ public class IgniteCacheEntryListenerTxTest extends IgniteCacheEntryListenerAbst return null; } + /** {@inheritDoc} */ @Override public void testEvents(){ fail("https://issues.apache.org/jira/browse/IGNITE-1600"); } http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java index 1c65f9b..a42f056 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractSelfTest.java @@ -55,6 +55,7 @@ import org.apache.ignite.cache.CacheAtomicWriteOrderMode; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheEntryEventSerializableFilter; import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.cache.CacheMemoryMode; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.affinity.Affinity; import org.apache.ignite.cache.query.ContinuousQuery; @@ -97,6 +98,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; +import static org.apache.ignite.cache.CacheMemoryMode.*; import static org.apache.ignite.cache.CacheMode.REPLICATED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; @@ -142,6 +144,7 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC ccfg.setBackups(backups); ccfg.setWriteSynchronizationMode(FULL_SYNC); ccfg.setNearConfiguration(nearCacheConfiguration()); + ccfg.setMemoryMode(memoryMode()); cfg.setCacheConfiguration(ccfg); @@ -151,6 +154,13 @@ public abstract class CacheContinuousQueryFailoverAbstractSelfTest extends GridC } /** + * @return Cache memory mode. + */ + protected CacheMemoryMode memoryMode() { + return ONHEAP_TIERED; + } + + /** * @return Near cache configuration. */ protected NearCacheConfiguration nearCacheConfiguration() { http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest.java new file mode 100644 index 0000000..cc8590d --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.cache.CacheMemoryMode; + +import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED; + +/** + * + */ +public class CacheContinuousQueryFailoverAtomicPrimaryWriteOrderOffheapTieredTest + extends CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest { + /** {@inheritDoc} */ + @Override protected CacheMemoryMode memoryMode() { + return OFFHEAP_TIERED; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxOffheapTieredTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxOffheapTieredTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxOffheapTieredTest.java new file mode 100644 index 0000000..cae06c3 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverTxOffheapTieredTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.cache.CacheMemoryMode; + +import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED; + +/** + * + */ +public class CacheContinuousQueryFailoverTxOffheapTieredTest extends CacheContinuousQueryFailoverTxSelfTest { + /** {@inheritDoc} */ + @Override protected CacheMemoryMode memoryMode() { + return OFFHEAP_TIERED; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java new file mode 100644 index 0000000..d9b2091 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryRandomOperationsTest.java @@ -0,0 +1,684 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.io.Serializable; +import java.util.Random; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import javax.cache.Cache; +import javax.cache.configuration.Factory; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryUpdatedListener; +import javax.cache.integration.CacheLoaderException; +import javax.cache.integration.CacheWriterException; +import javax.cache.processor.EntryProcessor; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMemoryMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.QueryCursor; +import org.apache.ignite.cache.store.CacheStore; +import org.apache.ignite.cache.store.CacheStoreAdapter; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED; +import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES; +import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED; +import static org.apache.ignite.cache.CacheMode.PARTITIONED; +import static org.apache.ignite.cache.CacheMode.REPLICATED; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +public class CacheContinuousQueryRandomOperationsTest extends GridCommonAbstractTest { + /** */ + private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final int NODES = 5; + + /** */ + private static final int KEYS = 10; + + /** */ + private static final int VALS = 10; + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder); + + cfg.setClientMode(client); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGridsMultiThreaded(NODES - 1); + + client = true; + + startGrid(NODES - 1); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** + * @throws Exception If failed. + */ + public void testAtomic() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + ATOMIC, + ONHEAP_TIERED, + false); + + testContinuousQuery(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicReplicated() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, + 0, + ATOMIC, + ONHEAP_TIERED, + false); + + testContinuousQuery(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOffheapValues() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + ATOMIC, + OFFHEAP_VALUES, + false); + + testContinuousQuery(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicOffheapTiered() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + ATOMIC, + OFFHEAP_TIERED, + false); + + testContinuousQuery(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicNoBackups() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 0, + ATOMIC, + ONHEAP_TIERED, + false); + + testContinuousQuery(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testTx() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + TRANSACTIONAL, + ONHEAP_TIERED, + false); + + testContinuousQuery(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testTxReplicated() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(REPLICATED, + 0, + TRANSACTIONAL, + ONHEAP_TIERED, + false); + + testContinuousQuery(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testTxOffheapValues() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + TRANSACTIONAL, + OFFHEAP_VALUES, + false); + + testContinuousQuery(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testTxOffheapTiered() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 1, + TRANSACTIONAL, + OFFHEAP_TIERED, + false); + + testContinuousQuery(ccfg); + } + + /** + * @throws Exception If failed. + */ + public void testTxNoBackups() throws Exception { + CacheConfiguration<Object, Object> ccfg = cacheConfiguration(PARTITIONED, + 0, + TRANSACTIONAL, + ONHEAP_TIERED, + false); + + testContinuousQuery(ccfg); + } + + /** + * @param ccfg Cache configuration. + * @throws Exception If failed. + */ + private void testContinuousQuery(CacheConfiguration<Object, Object> ccfg) throws Exception { + ignite(0).createCache(ccfg); + + try { + IgniteCache<Object, Object> cache = ignite(NODES - 1).cache(ccfg.getName()); + + long seed = System.currentTimeMillis(); + + Random rnd = new Random(seed); + + log.info("Random seed: " + seed); + + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + + final BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue = + new ArrayBlockingQueue<>(10_000); + + qry.setLocalListener(new CacheEntryUpdatedListener<Object, Object>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<?, ?>> evts) { + for (CacheEntryEvent<?, ?> evt : evts) { + // System.out.println("Event: " + evt); + + evtsQueue.add(evt); + } + } + }); + + QueryCursor<?> cur = cache.query(qry); + + ConcurrentMap<Object, Object> expData = new ConcurrentHashMap<>(); + + try { + for (int i = 0; i < 1000; i++) { + if (i % 100 == 0) + log.info("Iteration: " + i); + + randomUpdate(rnd, evtsQueue, expData, cache); + } + } + finally { + cur.close(); + } + } + finally { + ignite(0).destroyCache(ccfg.getName()); + } + } + + /** + * @param rnd Random generator. + * @param evtsQueue Events queue. + * @param expData Expected cache data. + * @param cache Cache. + * @throws Exception If failed. + */ + private void randomUpdate( + Random rnd, + BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue, + ConcurrentMap<Object, Object> expData, + IgniteCache<Object, Object> cache) + throws Exception { + Object key = new QueryTestKey(rnd.nextInt(KEYS)); + Object newVal = value(rnd); + Object oldVal = expData.get(key); + + int op = rnd.nextInt(11); + + // log.info("Random operation [key=" + key + ", op=" + op + ']'); + + switch (op) { + case 0: { + cache.put(key, newVal); + + waitEvent(evtsQueue, key, newVal, oldVal); + + expData.put(key, newVal); + + break; + } + + case 1: { + cache.getAndPut(key, newVal); + + waitEvent(evtsQueue, key, newVal, oldVal); + + expData.put(key, newVal); + + break; + } + + case 2: { + cache.remove(key); + + waitEvent(evtsQueue, key, null, oldVal); + + expData.remove(key); + + break; + } + + case 3: { + cache.getAndRemove(key); + + waitEvent(evtsQueue, key, null, oldVal); + + expData.remove(key); + + break; + } + + case 4: { + cache.invoke(key, new EntrySetValueProcessor(newVal, rnd.nextBoolean())); + + waitEvent(evtsQueue, key, newVal, oldVal); + + expData.put(key, newVal); + + break; + } + + case 5: { + cache.invoke(key, new EntrySetValueProcessor(null, rnd.nextBoolean())); + + waitEvent(evtsQueue, key, null, oldVal); + + expData.remove(key); + + break; + } + + case 6: { + cache.putIfAbsent(key, newVal); + + if (oldVal == null) { + waitEvent(evtsQueue, key, newVal, null); + + expData.put(key, newVal); + } + else + checkNoEvent(evtsQueue); + + break; + } + + case 7: { + cache.getAndPutIfAbsent(key, newVal); + + if (oldVal == null) { + waitEvent(evtsQueue, key, newVal, null); + + expData.put(key, newVal); + } + else + checkNoEvent(evtsQueue); + + break; + } + + case 8: { + cache.replace(key, newVal); + + if (oldVal != null) { + waitEvent(evtsQueue, key, newVal, oldVal); + + expData.put(key, newVal); + } + else + checkNoEvent(evtsQueue); + + break; + } + + case 9: { + cache.getAndReplace(key, newVal); + + if (oldVal != null) { + waitEvent(evtsQueue, key, newVal, oldVal); + + expData.put(key, newVal); + } + else + checkNoEvent(evtsQueue); + + break; + } + + case 10: { + if (oldVal != null) { + Object replaceVal = value(rnd); + + boolean success = replaceVal.equals(oldVal); + + if (success) { + cache.replace(key, replaceVal, newVal); + + waitEvent(evtsQueue, key, newVal, oldVal); + + expData.put(key, newVal); + } + else { + cache.replace(key, replaceVal, newVal); + + checkNoEvent(evtsQueue); + } + } + else { + cache.replace(key, value(rnd), newVal); + + checkNoEvent(evtsQueue); + } + + break; + } + + default: + fail(); + } + } + + /** + * @param rnd Random generator. + * @return Cache value. + */ + private static Object value(Random rnd) { + return new QueryTestValue(rnd.nextInt(VALS)); + } + + /** + * @param evtsQueue Event queue. + * @param key Key. + * @param val Value. + * @param oldVal Old value. + * @throws Exception If failed. + */ + private void waitEvent(BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue, + Object key, Object val, Object oldVal) throws Exception { + if (val == null && oldVal == null) { + checkNoEvent(evtsQueue); + + return; + } + + CacheEntryEvent<?, ?> evt = evtsQueue.poll(5, SECONDS); + + assertNotNull("Failed to wait for event [key=" + key + + ", val=" + val + + ", oldVal=" + oldVal + ']', evt); + assertEquals(key, evt.getKey()); + assertEquals(val, evt.getValue()); + assertEquals(oldVal, evt.getOldValue()); + } + + /** + * @param evtsQueue Event queue. + * @throws Exception If failed. + */ + private void checkNoEvent(BlockingQueue<CacheEntryEvent<?, ?>> evtsQueue) throws Exception { + CacheEntryEvent<?, ?> evt = evtsQueue.poll(50, MILLISECONDS); + + assertNull(evt); + } + + /** + * + * @param cacheMode Cache mode. + * @param backups Number of backups. + * @param atomicityMode Cache atomicity mode. + * @param memoryMode Cache memory mode. + * @param store If {@code true} configures dummy cache store. + * @return Cache configuration. + */ + private CacheConfiguration<Object, Object> cacheConfiguration( + CacheMode cacheMode, + int backups, + CacheAtomicityMode atomicityMode, + CacheMemoryMode memoryMode, + boolean store) { + CacheConfiguration<Object, Object> ccfg = new CacheConfiguration<>(); + + ccfg.setAtomicityMode(atomicityMode); + ccfg.setCacheMode(cacheMode); + ccfg.setMemoryMode(memoryMode); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setAtomicWriteOrderMode(PRIMARY); + + if (cacheMode == PARTITIONED) + ccfg.setBackups(backups); + + if (store) { + ccfg.setCacheStoreFactory(new TestStoreFactory()); + ccfg.setReadThrough(true); + ccfg.setWriteThrough(true); + } + + return ccfg; + } + + /** + * + */ + private static class TestStoreFactory implements Factory<CacheStore<Object, Object>> { + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public CacheStore<Object, Object> create() { + return new CacheStoreAdapter() { + @Override public Object load(Object key) throws CacheLoaderException { + return null; + } + + @Override public void write(Cache.Entry entry) throws CacheWriterException { + // No-op. + } + + @Override public void delete(Object key) throws CacheWriterException { + // No-op. + } + }; + } + } + + /** + * + */ + static class QueryTestKey implements Serializable { + /** */ + private final Integer key; + + /** + * @param key Key. + */ + public QueryTestKey(Integer key) { + this.key = key; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + QueryTestKey that = (QueryTestKey)o; + + return key.equals(that.key); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return key.hashCode(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(QueryTestKey.class, this); + } + } + + /** + * + */ + static class QueryTestValue implements Serializable { + /** */ + private final Integer val1; + + /** */ + private final String val2; + + /** + * @param val Value. + */ + public QueryTestValue(Integer val) { + this.val1 = val; + this.val2 = String.valueOf(val); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + QueryTestValue that = (QueryTestValue) o; + + return val1.equals(that.val1) && val2.equals(that.val2); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int res = val1.hashCode(); + + res = 31 * res + val2.hashCode(); + + return res; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(QueryTestValue.class, this); + } + } + /** + * + */ + protected static class EntrySetValueProcessor implements EntryProcessor<Object, Object, Object> { + /** */ + private Object val; + + /** */ + private boolean retOld; + + /** + * @param val Value to set. + * @param retOld Return old value flag. + */ + public EntrySetValueProcessor(Object val, boolean retOld) { + this.val = val; + this.retOld = retOld; + } + + /** {@inheritDoc} */ + @Override public Object process(MutableEntry<Object, Object> e, Object... args) { + Object old = retOld ? e.getValue() : null; + + if (val != null) + e.setValue(val); + else + e.remove(); + + return old; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(EntrySetValueProcessor.class, this); + } + } + +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java index 5abb98d..dbe282e 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java @@ -40,6 +40,7 @@ import org.apache.ignite.IgniteCache; import org.apache.ignite.IgniteException; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheEntryEventSerializableFilter; +import org.apache.ignite.cache.CacheMemoryMode; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cache.query.ContinuousQuery; @@ -73,9 +74,9 @@ import org.jsr166.ConcurrentHashMap8; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; - import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMemoryMode.ONHEAP_TIERED; import static org.apache.ignite.cache.CacheMode.LOCAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheMode.REPLICATED; @@ -117,6 +118,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo cacheCfg.setReadThrough(true); cacheCfg.setWriteThrough(true); cacheCfg.setLoadPreviousValue(true); + cacheCfg.setMemoryMode(memoryMode()); cfg.setCacheConfiguration(cacheCfg); } @@ -135,6 +137,13 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo } /** + * @return Cache memory mode. + */ + protected CacheMemoryMode memoryMode() { + return ONHEAP_TIERED; + } + + /** * @return Peer class loading enabled flag. */ protected boolean peerClassLoadingEnabled() { @@ -393,8 +402,8 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo } }); - try (QueryCursor<Cache.Entry<Integer, Integer>> query2 = cache1.query(qry2); - QueryCursor<Cache.Entry<Integer, Integer>> query1 = cache.query(qry1)) { + try (QueryCursor<Cache.Entry<Integer, Integer>> qryCur2 = cache1.query(qry2); + QueryCursor<Cache.Entry<Integer, Integer>> qryCur1 = cache.query(qry1)) { for (int i = 0; i < gridCount(); i++) { IgniteCache<Object, Object> cache0 = grid(i).cache(null); @@ -448,7 +457,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo } }); - QueryCursor<Cache.Entry<Integer, Integer>> query = cache.query(qry); + QueryCursor<Cache.Entry<Integer, Integer>> qryCur = cache.query(qry); for (int key = 0; key < keyCnt; key++) cache.put(key, key); @@ -461,7 +470,7 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo }, 2000L); } finally { - query.close(); + qryCur.close(); } } else { http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAtomicOffheapTieredTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAtomicOffheapTieredTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAtomicOffheapTieredTest.java new file mode 100644 index 0000000..d6948e2 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAtomicOffheapTieredTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.cache.CacheMemoryMode; + +import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED; + +/** + * + */ +public class GridCacheContinuousQueryAtomicOffheapTieredTest extends GridCacheContinuousQueryAtomicSelfTest { + /** {@inheritDoc} */ + @Override protected CacheMemoryMode memoryMode() { + return OFFHEAP_TIERED; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAtomicOffheapValuesTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAtomicOffheapValuesTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAtomicOffheapValuesTest.java new file mode 100644 index 0000000..4002435 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAtomicOffheapValuesTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.cache.CacheMemoryMode; + +import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_VALUES; + +/** + * + */ +public class GridCacheContinuousQueryAtomicOffheapValuesTest extends GridCacheContinuousQueryAtomicSelfTest { + /** {@inheritDoc} */ + @Override protected CacheMemoryMode memoryMode() { + return OFFHEAP_VALUES; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxOffheapTieredTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxOffheapTieredTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxOffheapTieredTest.java new file mode 100644 index 0000000..bcba7b6 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxOffheapTieredTest.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.cache.CacheMemoryMode; + +import static org.apache.ignite.cache.CacheMemoryMode.OFFHEAP_TIERED; + +/** + * + */ +public class GridCacheContinuousQueryTxOffheapTieredTest extends GridCacheContinuousQueryTxSelfTest { + /** {@inheritDoc} */ + @Override protected CacheMemoryMode memoryMode() { + return OFFHEAP_TIERED; + } +}
