This is an automated email from the ASF dual-hosted git repository. av pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 5b7a122 IGNITE-955 Local listener in continuous queries should not be mandatory (#5683) 5b7a122 is described below commit 5b7a1221ed4f8b32012a0ff8c89ae40e9c037cec Author: Petrov Mikhail <32207922+ololo3...@users.noreply.github.com> AuthorDate: Tue Feb 19 11:18:38 2019 +0300 IGNITE-955 Local listener in continuous queries should not be mandatory (#5683) --- .../processors/cache/IgniteCacheProxyImpl.java | 14 +- .../continuous/CacheContinuousQueryHandler.java | 2 +- .../continuous/CacheContinuousQueryHandlerV3.java | 1 - .../continuous/CacheContinuousQueryManager.java | 5 - .../GridCacheContinuousQueryAbstractSelfTest.java | 175 +++++++++++++++++++++ 5 files changed, 186 insertions(+), 11 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java index a305e7c..afcd60e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java @@ -659,8 +659,12 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< if (qry instanceof ContinuousQuery) { ContinuousQuery<K, V> qry0 = (ContinuousQuery<K, V>)qry; - if (qry0.getLocalListener() == null) - throw new IgniteException("Mandatory local listener is not set for the query: " + qry); + if (qry0.getLocalListener() == null && + qry0.getRemoteFilterFactory() == null && + qry0.getRemoteFilter() == null) { + throw new IgniteException("LocalListener, RemoterFilter " + + "or RemoteFilterFactory must be specified for the query: " + qry); + } if (qry0.getRemoteFilter() != null && qry0.getRemoteFilterFactory() != null) throw new IgniteException("Should be used either RemoterFilter or RemoteFilterFactory."); @@ -672,8 +676,10 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache< else { ContinuousQueryWithTransformer<K, V, ?> qry0 = (ContinuousQueryWithTransformer<K, V, ?>)qry; - if (qry0.getLocalListener() == null) - throw new IgniteException("Mandatory local transformed event listener is not set for the query: " + qry); + if (qry0.getLocalListener() == null && qry0.getRemoteFilterFactory() == null) { + throw new IgniteException("LocalListener " + + "or RemoteFilterFactory must be specified for the query: " + qry); + } if (qry0.getRemoteTransformerFactory() == null) throw new IgniteException("Mandatory RemoteTransformerFactory is not set for the query: " + qry); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 7972c150..b87858e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -993,7 +993,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler @Nullable IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?> trans) { EventListener locTransLsnr = localTransformedEventListener(); - assert (locLsnr != null && locTransLsnr == null) || (locLsnr == null && locTransLsnr != null); + assert locLsnr == null || locTransLsnr == null; if (F.isEmpty(evts)) return; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV3.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV3.java index 78228f3..0008cfc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV3.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV3.java @@ -94,7 +94,6 @@ public class CacheContinuousQueryHandlerV3<K, V> extends CacheContinuousQueryHan ignoreClsNotFound, null); - assert locTransLsnr != null; assert rmtTransFactory != null; this.locTransLsnr = locTransLsnr; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index b9f8fc5..0ae980a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -530,8 +530,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { if (rmtTransFactory != null) { clsr = new IgniteOutClosure<CacheContinuousQueryHandler>() { @Override public CacheContinuousQueryHandler apply() { - assert locTransLsnr != null; - return new CacheContinuousQueryHandlerV3( cctx.name(), TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), @@ -548,8 +546,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { else if (rmtFilterFactory != null) { clsr = new IgniteOutClosure<CacheContinuousQueryHandler>() { @Override public CacheContinuousQueryHandler apply() { - assert locLsnr != null; - return new CacheContinuousQueryHandlerV2( cctx.name(), TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), @@ -566,7 +562,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { else { clsr = new IgniteOutClosure<CacheContinuousQueryHandler>() { @Override public CacheContinuousQueryHandler apply() { - assert locLsnr != null; assert locTransLsnr == null; return new CacheContinuousQueryHandler( diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java index 16bbf87..095b976 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java @@ -33,6 +33,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import javax.cache.Cache; import javax.cache.configuration.Factory; +import javax.cache.configuration.FactoryBuilder; import javax.cache.event.CacheEntryEvent; import javax.cache.event.CacheEntryUpdatedListener; import javax.cache.event.EventType; @@ -47,6 +48,7 @@ import org.apache.ignite.cache.CacheEntryEventSerializableFilter; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.CachePeekMode; import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.cache.query.ContinuousQueryWithTransformer; import org.apache.ignite.cache.query.QueryCursor; import org.apache.ignite.cache.query.ScanQuery; import org.apache.ignite.cache.store.CacheStore; @@ -65,8 +67,10 @@ import org.apache.ignite.internal.processors.service.GridServiceProcessor; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.P2; import org.apache.ignite.internal.util.typedef.PA; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiInClosure; +import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.testframework.GridTestUtils; @@ -98,6 +102,9 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo /** */ private static final String NO_CACHE_IGNITE_INSTANCE_NAME = "noCacheGrid"; + /** Map of filtered entries. */ + private static final Map<Object, Object> FILTERED = new ConcurrentHashMap<>(); + /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { @@ -1196,6 +1203,174 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo } /** + * @throws Exception if failed. + */ + @Test + public void testQueryWithRemoteFilterFactory() throws Exception { + doQueryWithRemoteFilterFactory(true, true); + doQueryWithRemoteFilterFactory(true, false); + doQueryWithRemoteFilterFactory(false, true); + doQueryWithRemoteFilterFactory(false, false); + } + + /** + * @throws Exception if failed. + */ + @Test + public void testQueryWithRemoteFilter() throws Exception { + doQueryWithRemoteFilter(true, true); + doQueryWithRemoteFilter(true, false); + doQueryWithRemoteFilter(false, true); + doQueryWithRemoteFilter(false, false); + } + + /** + * @throws Exception if failed. + */ + @Test + public void testQueryWithRemoteTransformer() throws Exception{ + doQueryWithRemoteTransformer(true, true); + doQueryWithRemoteTransformer(true, false); + doQueryWithRemoteTransformer(false, true); + doQueryWithRemoteTransformer(false, false); + } + + /** + * @throws Exception if failed. + * @param bypassFilter Whether remote filter should be bypassed. + * @param setLocLsnr Whether local listner should be setted. + */ + private void doQueryWithRemoteFilterFactory(boolean setLocLsnr, boolean bypassFilter) throws Exception { + FILTERED.clear(); + + ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>(); + + Map<Integer, Integer> listened = new ConcurrentHashMap<>(); + + if (setLocLsnr) { + qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + evts.forEach(event -> listened.put(event.getKey(), event.getValue())); + } + }); + } + + qry.setRemoteFilterFactory( + FactoryBuilder.factoryOf((CacheEntryEventSerializableFilter<Integer, Integer>)evt -> { + FILTERED.put(evt.getKey(), evt.getValue()); + + return bypassFilter; + })); + + try (QueryCursor<Cache.Entry<Integer, Integer>> qryCursor = grid(0).cache(DEFAULT_CACHE_NAME).query(qry)) { + checkLsnrAndFilterResults(setLocLsnr, bypassFilter, listened); + } + } + + /** + * @throws Exception if failed. + * @param bypassFilter Whether remote filter should be bypassed. + * @param setLocLsnr Whether local listner should be setted. + */ + private void doQueryWithRemoteFilter(boolean setLocLsnr, boolean bypassFilter) throws Exception { + FILTERED.clear(); + + ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>(); + + Map<Integer, Integer> listened = new ConcurrentHashMap<>(); + + if (setLocLsnr) { + qry.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + evts.forEach(event -> listened.put(event.getKey(), event.getValue())); + } + }); + } + + qry.setRemoteFilter(evt -> { + FILTERED.put(evt.getKey(), evt.getValue()); + + return bypassFilter; + }); + + try (QueryCursor<Cache.Entry<Integer, Integer>> qryCursor = grid(0).cache(DEFAULT_CACHE_NAME).query(qry)) { + checkLsnrAndFilterResults(setLocLsnr, bypassFilter, listened); + } + } + + /** + * @throws Exception if failed. + * @param bypassFilter Whether remote filter should be bypassed. + * @param setLocLsnr Whether local listner should be setted. + */ + private void doQueryWithRemoteTransformer(boolean setLocLsnr, boolean bypassFilter) throws Exception { + FILTERED.clear(); + + ContinuousQueryWithTransformer<Integer, Integer, T2<Integer, Integer>> qry = + new ContinuousQueryWithTransformer<>(); + + Map<Integer, Integer> listened = new ConcurrentHashMap<>(); + + if (setLocLsnr) { + qry.setLocalListener(evts -> { + evts.forEach(event -> + listened.put(event.getKey(), event.getValue())); + }); + } + + qry.setRemoteFilterFactory( + FactoryBuilder.factoryOf((CacheEntryEventSerializableFilter<Integer, Integer>)evt -> { + FILTERED.put(evt.getKey(), evt.getValue()); + + return bypassFilter; + })); + + qry.setRemoteTransformerFactory(FactoryBuilder.factoryOf( + new IgniteClosure<CacheEntryEvent<? extends Integer, ? extends Integer>, T2<Integer, Integer>>() { + @Override public T2<Integer, Integer> apply(CacheEntryEvent<? extends Integer, ? extends Integer> evt) { + T2<Integer, Integer> res = new T2<>(); + + res.put(evt.getKey(), evt.getValue()); + + return res; + } + })); + + try (QueryCursor<Cache.Entry<Integer, Integer>> qryCursor = grid(0).cache(DEFAULT_CACHE_NAME).query(qry)) { + checkLsnrAndFilterResults(setLocLsnr, bypassFilter, listened); + } + } + + /** + * @param setLocLsnr Whether local listner was setted. + * @param bypassFilter Whether remote filter was bypassed. + * @param listened Entries got by listener. + * @throws Exception if failed. + */ + private void checkLsnrAndFilterResults(boolean setLocLsnr, boolean bypassFilter, Map<Integer, Integer> listened) + throws Exception { + Map<Integer, Integer> expected = new HashMap<>(); + + expected.put(1, 1); + expected.put(2, 2); + + expected.forEach((key, val) -> + grid(0).<Integer, Integer>cache(DEFAULT_CACHE_NAME).put(key, val)); + + assertTrue(GridTestUtils.waitForCondition( + () -> FILTERED.size() == expected.size() && + FILTERED.equals(expected), getTestTimeout())); + + if (bypassFilter && setLocLsnr) { + assertTrue(GridTestUtils.waitForCondition( + () -> listened.size() == expected.size() && + listened.equals(expected), getTestTimeout())); + } + else + assertTrue(listened.isEmpty()); + } + + /** * */ private static class StoreFactory implements Factory<CacheStore> {