This is an automated email from the ASF dual-hosted git repository.

av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b7a122  IGNITE-955 Local listener in continuous queries should not be 
mandatory (#5683)
5b7a122 is described below

commit 5b7a1221ed4f8b32012a0ff8c89ae40e9c037cec
Author: Petrov Mikhail <32207922+ololo3...@users.noreply.github.com>
AuthorDate: Tue Feb 19 11:18:38 2019 +0300

    IGNITE-955 Local listener in continuous queries should not be mandatory 
(#5683)
---
 .../processors/cache/IgniteCacheProxyImpl.java     |  14 +-
 .../continuous/CacheContinuousQueryHandler.java    |   2 +-
 .../continuous/CacheContinuousQueryHandlerV3.java  |   1 -
 .../continuous/CacheContinuousQueryManager.java    |   5 -
 .../GridCacheContinuousQueryAbstractSelfTest.java  | 175 +++++++++++++++++++++
 5 files changed, 186 insertions(+), 11 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
index a305e7c..afcd60e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
@@ -659,8 +659,12 @@ public class IgniteCacheProxyImpl<K, V> extends 
AsyncSupportAdapter<IgniteCache<
         if (qry instanceof ContinuousQuery) {
             ContinuousQuery<K, V> qry0 = (ContinuousQuery<K, V>)qry;
 
-            if (qry0.getLocalListener() == null)
-                throw new IgniteException("Mandatory local listener is not set 
for the query: " + qry);
+            if (qry0.getLocalListener() == null &&
+                qry0.getRemoteFilterFactory() == null &&
+                qry0.getRemoteFilter() == null) {
+                throw new IgniteException("LocalListener, RemoterFilter " +
+                    "or RemoteFilterFactory must be specified for the query: " 
+ qry);
+            }
 
             if (qry0.getRemoteFilter() != null && 
qry0.getRemoteFilterFactory() != null)
                 throw new IgniteException("Should be used either RemoterFilter 
or RemoteFilterFactory.");
@@ -672,8 +676,10 @@ public class IgniteCacheProxyImpl<K, V> extends 
AsyncSupportAdapter<IgniteCache<
         else {
             ContinuousQueryWithTransformer<K, V, ?> qry0 = 
(ContinuousQueryWithTransformer<K, V, ?>)qry;
 
-            if (qry0.getLocalListener() == null)
-                throw new IgniteException("Mandatory local transformed event 
listener is not set for the query: " + qry);
+            if (qry0.getLocalListener() == null && 
qry0.getRemoteFilterFactory() == null) {
+                throw new IgniteException("LocalListener " +
+                    "or RemoteFilterFactory must be specified for the query: " 
+ qry);
+            }
 
             if (qry0.getRemoteTransformerFactory() == null)
                 throw new IgniteException("Mandatory RemoteTransformerFactory 
is not set for the query: " + qry);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 7972c150..b87858e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -993,7 +993,7 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
         @Nullable IgniteClosure<CacheEntryEvent<? extends K, ? extends V>, ?> 
trans) {
         EventListener locTransLsnr = localTransformedEventListener();
 
-        assert (locLsnr != null && locTransLsnr == null) || (locLsnr == null 
&& locTransLsnr != null);
+        assert locLsnr == null || locTransLsnr == null;
 
         if (F.isEmpty(evts))
             return;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV3.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV3.java
index 78228f3..0008cfc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV3.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandlerV3.java
@@ -94,7 +94,6 @@ public class CacheContinuousQueryHandlerV3<K, V> extends 
CacheContinuousQueryHan
             ignoreClsNotFound,
             null);
 
-        assert locTransLsnr != null;
         assert rmtTransFactory != null;
 
         this.locTransLsnr = locTransLsnr;
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index b9f8fc5..0ae980a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -530,8 +530,6 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
         if (rmtTransFactory != null) {
             clsr = new IgniteOutClosure<CacheContinuousQueryHandler>() {
                 @Override public CacheContinuousQueryHandler apply() {
-                    assert locTransLsnr != null;
-
                     return new CacheContinuousQueryHandlerV3(
                         cctx.name(),
                         TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), 
seq.getAndIncrement()),
@@ -548,8 +546,6 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
         else if (rmtFilterFactory != null) {
             clsr = new IgniteOutClosure<CacheContinuousQueryHandler>() {
                 @Override public CacheContinuousQueryHandler apply() {
-                    assert locLsnr != null;
-
                     return new CacheContinuousQueryHandlerV2(
                         cctx.name(),
                         TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), 
seq.getAndIncrement()),
@@ -566,7 +562,6 @@ public class CacheContinuousQueryManager extends 
GridCacheManagerAdapter {
         else {
             clsr = new IgniteOutClosure<CacheContinuousQueryHandler>() {
                 @Override public CacheContinuousQueryHandler apply() {
-                    assert locLsnr != null;
                     assert locTransLsnr == null;
 
                     return new CacheContinuousQueryHandler(
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
index 16bbf87..095b976 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java
@@ -33,6 +33,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.cache.Cache;
 import javax.cache.configuration.Factory;
+import javax.cache.configuration.FactoryBuilder;
 import javax.cache.event.CacheEntryEvent;
 import javax.cache.event.CacheEntryUpdatedListener;
 import javax.cache.event.EventType;
@@ -47,6 +48,7 @@ import 
org.apache.ignite.cache.CacheEntryEventSerializableFilter;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.CachePeekMode;
 import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.ContinuousQueryWithTransformer;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.ScanQuery;
 import org.apache.ignite.cache.store.CacheStore;
@@ -65,8 +67,10 @@ import 
org.apache.ignite.internal.processors.service.GridServiceProcessor;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.P2;
 import org.apache.ignite.internal.util.typedef.PA;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
 import org.apache.ignite.testframework.GridTestUtils;
@@ -98,6 +102,9 @@ public abstract class 
GridCacheContinuousQueryAbstractSelfTest extends GridCommo
     /** */
     private static final String NO_CACHE_IGNITE_INSTANCE_NAME = "noCacheGrid";
 
+    /** Map of filtered entries. */
+    private static final Map<Object, Object> FILTERED = new 
ConcurrentHashMap<>();
+
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
@@ -1196,6 +1203,174 @@ public abstract class 
GridCacheContinuousQueryAbstractSelfTest extends GridCommo
     }
 
     /**
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testQueryWithRemoteFilterFactory() throws Exception {
+        doQueryWithRemoteFilterFactory(true, true);
+        doQueryWithRemoteFilterFactory(true, false);
+        doQueryWithRemoteFilterFactory(false, true);
+        doQueryWithRemoteFilterFactory(false, false);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testQueryWithRemoteFilter() throws Exception {
+        doQueryWithRemoteFilter(true, true);
+        doQueryWithRemoteFilter(true, false);
+        doQueryWithRemoteFilter(false, true);
+        doQueryWithRemoteFilter(false, false);
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testQueryWithRemoteTransformer() throws Exception{
+        doQueryWithRemoteTransformer(true, true);
+        doQueryWithRemoteTransformer(true, false);
+        doQueryWithRemoteTransformer(false, true);
+        doQueryWithRemoteTransformer(false, false);
+    }
+
+    /**
+     * @throws Exception if failed.
+     * @param bypassFilter Whether remote filter should be bypassed.
+     * @param setLocLsnr Whether local listner should be setted.
+     */
+    private void doQueryWithRemoteFilterFactory(boolean setLocLsnr, boolean 
bypassFilter) throws Exception {
+        FILTERED.clear();
+
+        ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+        Map<Integer, Integer> listened = new ConcurrentHashMap<>();
+
+        if (setLocLsnr) {
+            qry.setLocalListener(new CacheEntryUpdatedListener<Integer, 
Integer>() {
+                @Override public void onUpdated(Iterable<CacheEntryEvent<? 
extends Integer, ? extends Integer>> evts) {
+                    evts.forEach(event -> listened.put(event.getKey(), 
event.getValue()));
+                }
+            });
+        }
+
+        qry.setRemoteFilterFactory(
+            
FactoryBuilder.factoryOf((CacheEntryEventSerializableFilter<Integer, 
Integer>)evt -> {
+                FILTERED.put(evt.getKey(), evt.getValue());
+
+                return bypassFilter;
+            }));
+
+        try (QueryCursor<Cache.Entry<Integer, Integer>> qryCursor = 
grid(0).cache(DEFAULT_CACHE_NAME).query(qry)) {
+            checkLsnrAndFilterResults(setLocLsnr, bypassFilter, listened);
+        }
+    }
+
+    /**
+     * @throws Exception if failed.
+     * @param bypassFilter Whether remote filter should be bypassed.
+     * @param setLocLsnr Whether local listner should be setted.
+     */
+    private void doQueryWithRemoteFilter(boolean setLocLsnr, boolean 
bypassFilter) throws Exception {
+        FILTERED.clear();
+
+        ContinuousQuery<Integer, Integer> qry = new ContinuousQuery<>();
+
+        Map<Integer, Integer> listened = new ConcurrentHashMap<>();
+
+        if (setLocLsnr) {
+            qry.setLocalListener(new CacheEntryUpdatedListener<Integer, 
Integer>() {
+                @Override public void onUpdated(Iterable<CacheEntryEvent<? 
extends Integer, ? extends Integer>> evts) {
+                    evts.forEach(event -> listened.put(event.getKey(), 
event.getValue()));
+                }
+            });
+        }
+
+        qry.setRemoteFilter(evt -> {
+                FILTERED.put(evt.getKey(), evt.getValue());
+
+                return bypassFilter;
+            });
+
+        try (QueryCursor<Cache.Entry<Integer, Integer>> qryCursor = 
grid(0).cache(DEFAULT_CACHE_NAME).query(qry)) {
+            checkLsnrAndFilterResults(setLocLsnr, bypassFilter, listened);
+        }
+    }
+
+    /**
+     * @throws Exception if failed.
+     * @param bypassFilter Whether remote filter should be bypassed.
+     * @param setLocLsnr Whether local listner should be setted.
+     */
+    private void doQueryWithRemoteTransformer(boolean setLocLsnr, boolean 
bypassFilter) throws Exception {
+        FILTERED.clear();
+
+        ContinuousQueryWithTransformer<Integer, Integer, T2<Integer, Integer>> 
qry =
+            new ContinuousQueryWithTransformer<>();
+
+        Map<Integer, Integer> listened = new ConcurrentHashMap<>();
+
+        if (setLocLsnr) {
+            qry.setLocalListener(evts -> {
+                evts.forEach(event ->
+                    listened.put(event.getKey(), event.getValue()));
+            });
+        }
+
+        qry.setRemoteFilterFactory(
+            
FactoryBuilder.factoryOf((CacheEntryEventSerializableFilter<Integer, 
Integer>)evt -> {
+                FILTERED.put(evt.getKey(), evt.getValue());
+
+                return bypassFilter;
+            }));
+
+        qry.setRemoteTransformerFactory(FactoryBuilder.factoryOf(
+            new IgniteClosure<CacheEntryEvent<? extends Integer, ? extends 
Integer>, T2<Integer, Integer>>() {
+                @Override public T2<Integer, Integer> apply(CacheEntryEvent<? 
extends Integer, ? extends Integer> evt) {
+                    T2<Integer, Integer> res = new T2<>();
+
+                    res.put(evt.getKey(), evt.getValue());
+
+                    return res;
+                }
+            }));
+
+        try (QueryCursor<Cache.Entry<Integer, Integer>> qryCursor = 
grid(0).cache(DEFAULT_CACHE_NAME).query(qry)) {
+            checkLsnrAndFilterResults(setLocLsnr, bypassFilter, listened);
+        }
+    }
+
+    /**
+     * @param setLocLsnr Whether local listner was setted.
+     * @param bypassFilter Whether remote filter was bypassed.
+     * @param listened Entries got by listener.
+     * @throws Exception if failed.
+     */
+    private void checkLsnrAndFilterResults(boolean setLocLsnr, boolean 
bypassFilter, Map<Integer, Integer> listened)
+        throws Exception {
+        Map<Integer, Integer> expected = new HashMap<>();
+
+        expected.put(1, 1);
+        expected.put(2, 2);
+
+        expected.forEach((key, val) ->
+            grid(0).<Integer, Integer>cache(DEFAULT_CACHE_NAME).put(key, val));
+
+        assertTrue(GridTestUtils.waitForCondition(
+            () -> FILTERED.size() == expected.size() &&
+                FILTERED.equals(expected), getTestTimeout()));
+
+        if (bypassFilter && setLocLsnr) {
+            assertTrue(GridTestUtils.waitForCondition(
+                () -> listened.size() == expected.size() &&
+                    listened.equals(expected), getTestTimeout()));
+        }
+        else
+            assertTrue(listened.isEmpty());
+    }
+
+    /**
      *
      */
     private static class StoreFactory implements Factory<CacheStore> {

Reply via email to