IGNITE-1536 - Removed duplicated continuous query notifications in REPLICATED cache
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/7db44f11 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/7db44f11 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/7db44f11 Branch: refs/heads/ignite-1272 Commit: 7db44f11f7925b5a29a0a3e017baa93b52fb2982 Parents: 70a8a92 Author: Valentin Kulichenko <valentin.kuliche...@gmail.com> Authored: Wed Sep 23 18:53:06 2015 -0700 Committer: Valentin Kulichenko <valentin.kuliche...@gmail.com> Committed: Wed Sep 23 18:53:06 2015 -0700 ---------------------------------------------------------------------- .../processors/cache/IgniteCacheProxy.java | 4 +- .../continuous/CacheContinuousQueryManager.java | 58 +++------ .../continuous/GridContinuousProcessor.java | 3 +- ...ontinuousQueryReplicatedOneNodeSelfTest.java | 120 +++++++++++++++++++ .../IgniteCacheQuerySelfTestSuite.java | 4 +- 5 files changed, 144 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/7db44f11/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java index cc6c19a..ae96f23 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java @@ -556,7 +556,7 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V qry.getPageSize(), qry.getTimeInterval(), qry.isAutoUnsubscribe(), - loc ? ctx.grid().cluster().forLocal() : null); + loc); final QueryCursor<Cache.Entry<K, V>> cur = qry.getInitialQuery() != null ? query(qry.getInitialQuery()) : null; @@ -1896,4 +1896,4 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V @Override public String toString() { return S.toString(IgniteCacheProxy.class, this); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7db44f11/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index c719f1e..6a151a5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -43,10 +43,9 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheEntryEventSerializableFilter; +import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cache.query.ContinuousQuery; -import org.apache.ignite.cluster.ClusterGroup; import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.cluster.ClusterTopologyException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; @@ -55,6 +54,7 @@ import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.continuous.GridContinuousHandler; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.plugin.security.SecurityPermission; import org.apache.ignite.resources.LoggerResource; import org.jsr166.ConcurrentHashMap8; @@ -271,7 +271,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { * @param bufSize Buffer size. * @param timeInterval Time interval. * @param autoUnsubscribe Auto unsubscribe flag. - * @param grp Cluster group. + * @param loc Local flag. * @return Continuous routine ID. * @throws IgniteCheckedException In case of error. */ @@ -280,7 +280,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { int bufSize, long timeInterval, boolean autoUnsubscribe, - ClusterGroup grp) throws IgniteCheckedException + boolean loc) throws IgniteCheckedException { return executeQuery0( locLsnr, @@ -293,7 +293,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { true, false, true, - grp); + loc); } /** @@ -321,7 +321,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { true, false, true, - loc ? cctx.grid().cluster().forLocal() : null); + loc); } /** @@ -383,7 +383,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { * @param oldValRequired Old value required flag. * @param sync Synchronous flag. * @param ignoreExpired Ignore expired event flag. - * @param grp Cluster group. + * @param loc Local flag. * @return Continuous routine ID. * @throws IgniteCheckedException In case of error. */ @@ -397,44 +397,15 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { boolean oldValRequired, boolean sync, boolean ignoreExpired, - ClusterGroup grp) throws IgniteCheckedException + boolean loc) throws IgniteCheckedException { cctx.checkSecurity(SecurityPermission.CACHE_READ); - if (grp == null) - grp = cctx.kernalContext().grid().cluster(); - - Collection<ClusterNode> nodes = grp.nodes(); - - if (nodes.isEmpty()) - throw new ClusterTopologyException("Failed to execute continuous query (empty cluster group is " + - "provided)."); - - boolean skipPrimaryCheck = false; - - switch (cctx.config().getCacheMode()) { - case LOCAL: - if (!nodes.contains(cctx.localNode())) - throw new ClusterTopologyException("Continuous query for LOCAL cache can be executed " + - "only locally (provided projection contains remote nodes only)."); - else if (nodes.size() > 1) - U.warn(log, "Continuous query for LOCAL cache will be executed locally (provided projection is " + - "ignored)."); - - grp = grp.forNode(cctx.localNode()); - - break; - - case REPLICATED: - if (nodes.size() == 1 && F.first(nodes).equals(cctx.localNode())) - skipPrimaryCheck = cctx.affinityNode(); - - break; - } - int taskNameHash = !internal && cctx.kernalContext().security().enabled() ? cctx.kernalContext().job().currentTaskNameHash() : 0; + boolean skipPrimaryCheck = loc && cctx.config().getCacheMode() == CacheMode.REPLICATED && cctx.affinityNode(); + GridContinuousHandler hnd = new CacheContinuousQueryHandler( cctx.name(), TOPIC_CACHE.topic(topicPrefix, cctx.localNodeId(), seq.getAndIncrement()), @@ -448,12 +419,17 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { taskNameHash, skipPrimaryCheck); + IgnitePredicate<ClusterNode> pred = null; + + if (loc || cctx.config().getCacheMode() == CacheMode.LOCAL) + pred = F.nodeForNodeId(cctx.localNodeId()); + UUID id = cctx.kernalContext().continuous().startRoutine( hnd, bufSize, timeInterval, autoUnsubscribe, - grp.predicate()).get(); + pred).get(); if (notifyExisting) { final Iterator<GridCacheEntryEx> it = cctx.cache().allEntries().iterator(); @@ -635,7 +611,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { cfg.isOldValueRequired(), cfg.isSynchronous(), false, - null); + false); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/7db44f11/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 18c1f36..e29bdd4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -795,7 +795,8 @@ public class GridContinuousProcessor extends GridProcessorAdapter { try { IgnitePredicate<ClusterNode> prjPred = data.projectionPredicate(); - ctx.resource().injectGeneric(prjPred); + if (prjPred != null) + ctx.resource().injectGeneric(prjPred); if (prjPred == null || prjPred.apply(ctx.discovery().node(ctx.localNodeId()))) { registered = registerHandler(node.id(), routineId, hnd, data.bufferSize(), data.interval(), http://git-wip-us.apache.org/repos/asf/ignite/blob/7db44f11/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedOneNodeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedOneNodeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedOneNodeSelfTest.java new file mode 100644 index 0000000..8152b2a --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryReplicatedOneNodeSelfTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryListenerException; +import javax.cache.event.CacheEntryUpdatedListener; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheRebalanceMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.query.ContinuousQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +/** + * Test for replicated cache with one node. + */ +public class GridCacheContinuousQueryReplicatedOneNodeSelfTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setCacheMode(CacheMode.REPLICATED); + cacheCfg.setRebalanceMode(CacheRebalanceMode.SYNC); + cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + + cfg.setCacheConfiguration(cacheCfg); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(IP_FINDER); + + cfg.setDiscoverySpi(disco); + + return cfg; + } + + /** + * @throws Exception If failed. + */ + public void testLocal() throws Exception { + doTest(true); + } + + /** + * @throws Exception If failed. + */ + public void testDistributed() throws Exception { + doTest(false); + } + + /** + * @throws Exception If failed. + */ + private void doTest(boolean loc) throws Exception { + try { + IgniteCache<String, Integer> cache = startGrid(0).cache(null); + + ContinuousQuery<String, Integer> qry = new ContinuousQuery<>(); + + final AtomicInteger cnt = new AtomicInteger(); + final CountDownLatch latch = new CountDownLatch(10); + + qry.setLocalListener(new CacheEntryUpdatedListener<String, Integer>() { + @Override + public void onUpdated(Iterable<CacheEntryEvent<? extends String, ? extends Integer>> evts) + throws CacheEntryListenerException { + for (CacheEntryEvent<? extends String, ? extends Integer> evt : evts) { + cnt.incrementAndGet(); + latch.countDown(); + } + } + }); + + cache.query(qry.setLocal(loc)); + + startGrid(1); + + awaitPartitionMapExchange(); + + for (int i = 0; i < 10; i++) + cache.put("key" + i, i); + + assert latch.await(5000, TimeUnit.MILLISECONDS); + + assertEquals(10, cnt.get()); + } + finally { + stopAllGrids(); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/7db44f11/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index 41670d1..fe54b63 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -73,6 +73,7 @@ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheCon import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryPartitionedP2PDisabledSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryPartitionedSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedAtomicSelfTest; +import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedOneNodeSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedP2PDisabledSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTest; @@ -158,6 +159,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { suite.addTestSuite(GridCacheContinuousQueryAtomicNearEnabledSelfTest.class); suite.addTestSuite(GridCacheContinuousQueryAtomicP2PDisabledSelfTest.class); suite.addTestSuite(IgniteCacheContinuousQueryClientTest.class); + suite.addTestSuite(GridCacheContinuousQueryReplicatedOneNodeSelfTest.class); // Reduce fields queries. suite.addTestSuite(GridCacheReduceFieldsQueryLocalSelfTest.class); @@ -187,4 +189,4 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { return suite; } -} \ No newline at end of file +}