Repository: ignite Updated Branches: refs/heads/ignite-2384 [created] 0092220cb
IGNITE-2384 Fixed bug when continuous query init. Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0092220c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0092220c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0092220c Branch: refs/heads/ignite-2384 Commit: 0092220cbd8da494cd3632ece3b31a901ad0318a Parents: fa427dc Author: Tikhonov Nikolay <[email protected]> Authored: Fri Jan 15 13:20:56 2016 +0300 Committer: Tikhonov Nikolay <[email protected]> Committed: Fri Jan 15 13:20:56 2016 +0300 ---------------------------------------------------------------------- .../internal/GridEventConsumeHandler.java | 3 +- .../internal/GridMessageListenHandler.java | 6 +- .../continuous/CacheContinuousQueryHandler.java | 14 +- .../continuous/GridContinuousHandler.java | 4 +- .../continuous/GridContinuousProcessor.java | 2 +- .../CacheContinuousIssueSelfTest.java | 224 +++++++++++++++++++ .../CacheContinuousIssueTxSelfTest.java | 34 +++ .../IgniteBinaryCacheQueryTestSuite.java | 4 + 8 files changed, 282 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0092220c/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java index be35ba4..69af6cd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java @@ -35,6 +35,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeployment; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; @@ -135,7 +136,7 @@ class GridEventConsumeHandler implements GridContinuousHandler { } /** {@inheritDoc} */ - @Override public void updateCounters(Map<Integer, Long> cntrs) { + @Override public void updateCounters(AffinityTopologyVersion topVer, Map<Integer, Long> cntrs) { // No-op. } http://git-wip-us.apache.org/repos/asf/ignite/blob/0092220c/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java index 2edfda5..13aeb54 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java @@ -27,6 +27,7 @@ import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.managers.deployment.GridDeployment; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.continuous.GridContinuousBatch; import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter; import org.apache.ignite.internal.processors.continuous.GridContinuousHandler; @@ -108,12 +109,13 @@ public class GridMessageListenHandler implements GridContinuousHandler { } /** {@inheritDoc} */ - @Override public void updateCounters(Map<Integer, Long> cntrs) { + @Override public void updateCounters(AffinityTopologyVersion topVer, Map<Integer, Long> cntrs) { // No-op. } /** {@inheritDoc} */ - @Override public RegisterStatus register(UUID nodeId, UUID routineId, final GridKernalContext ctx) throws IgniteCheckedException { + @Override public RegisterStatus register(UUID nodeId, UUID routineId, final GridKernalContext ctx) + throws IgniteCheckedException { ctx.io().addUserMessageListener(topic, pred); return RegisterStatus.REGISTERED; http://git-wip-us.apache.org/repos/asf/ignite/blob/0092220c/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index ad86d65..1753b26 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -148,6 +148,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler /** */ private Map<Integer, Long> initUpdCntrs; + /** */ + private AffinityTopologyVersion initTopVer; + /** * Required by {@link Externalizable}. */ @@ -238,8 +241,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler } /** {@inheritDoc} */ - @Override public void updateCounters(Map<Integer, Long> cntrs) { + @Override public void updateCounters(AffinityTopologyVersion topVer, Map<Integer, Long> cntrs) { this.initUpdCntrs = cntrs; + this.initTopVer = topVer; } /** {@inheritDoc} */ @@ -384,7 +388,9 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler if (!internal) { entry.markBackup(); - backupQueue.add(entry); + // Skip init query and expire entries. + if (entry.updateCounter() != -1) + backupQueue.add(entry); } } } @@ -483,7 +489,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler e = buf.skipEntry(e); - if (e != null) + if (e != null && !ctx.localNodeId().equals(nodeId)) ctx.continuous().addNotification(nodeId, routineId, e, topic, sync, true); } catch (ClusterTopologyCheckedException ex) { @@ -650,7 +656,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler PartitionRecovery rec = rcvs.get(e.partition()); if (rec == null) { - rec = new PartitionRecovery(ctx.log(getClass()), cacheContext(ctx).topology().topologyVersion(), + rec = new PartitionRecovery(ctx.log(getClass()), initTopVer, initUpdCntrs == null ? null : initUpdCntrs.get(e.partition())); PartitionRecovery oldRec = rcvs.putIfAbsent(e.partition(), rec); http://git-wip-us.apache.org/repos/asf/ignite/blob/0092220c/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java index 900835a..8cd30a8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.jetbrains.annotations.Nullable; /** @@ -154,6 +155,7 @@ public interface GridContinuousHandler extends Externalizable, Cloneable { /** * @param cntrs Init state for partition counters. + * @param topVer Topology version. */ - public void updateCounters(Map<Integer, Long> cntrs); + public void updateCounters(AffinityTopologyVersion topVer, Map<Integer, Long> cntrs); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/0092220c/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 9bc9a38..4825050 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -231,7 +231,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } } - routine.handler().updateCounters(msg.updateCounters()); + routine.handler().updateCounters(topVer, msg.updateCounters()); } fut.onRemoteRegistered(); http://git-wip-us.apache.org/repos/asf/ignite/blob/0092220c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousIssueSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousIssueSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousIssueSelfTest.java new file mode 100644 index 0000000..e1000e4 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousIssueSelfTest.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.io.Serializable; +import java.util.concurrent.atomic.AtomicInteger; +import javax.cache.configuration.MutableCacheEntryListenerConfiguration; +import javax.cache.event.CacheEntryCreatedListener; +import javax.cache.event.CacheEntryEvent; +import javax.cache.event.CacheEntryExpiredListener; +import javax.cache.event.CacheEntryRemovedListener; +import javax.cache.event.CacheEntryUpdatedListener; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.cache.CacheRebalanceMode; +import org.apache.ignite.cache.CacheWriteSynchronizationMode; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.util.typedef.PA; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static javax.cache.configuration.FactoryBuilder.factoryOf; + +/** + * Test from https://issues.apache.org/jira/browse/IGNITE-2384. + */ +public class CacheContinuousIssueSelfTest extends GridCommonAbstractTest { + /** */ + static public TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** Cache name. */ + public static final String CACHE_NAME = "test_cache"; + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + startGridsMultiThreaded(2); + + awaitPartitionMapExchange(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testEvent() throws Exception { + IgniteCache<Integer, String> cache1 = grid(0).getOrCreateCache(CACHE_NAME); + + final AllEventListener<Integer, String> lsnr1 = registerCacheListener(cache1); + + IgniteCache<Integer, String> cache2 = grid(1).getOrCreateCache(CACHE_NAME); + + int key = affinityKeyForNode(1, grid(0)); + cache1.put(key, "vodka"); + + // Note the issue is only reproducible if the second registration is done right + // here, after the first put() above. + final AllEventListener<Integer, String> lsnr2 = registerCacheListener(cache2); + + assert GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return lsnr1.createdCount.get() == 1; + } + }, 2000L) : "Unexpected number of events: " + lsnr1.createdCount.get(); + + // Sanity check. + assert GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return lsnr2.createdCount.get() == 0; + } + }, 2000L) : "Expected no create events, but got: " + lsnr2.createdCount.get(); + + // node2 now becomes the primary for the key. + grid(0).close(); + + awaitPartitionMapExchange(); + + cache2.put(key, "peevo"); + + // Sanity check. + assert GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return lsnr1.createdCount.get() == 1; + } + }, 2000L) : "Expected no change here, but got: " + lsnr1.createdCount.get(); + + // Sanity check. + assert GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return lsnr2.updatedCount.get() == 0; + } + }, 2000L) : "Expected no update events, but got: " + lsnr2.updatedCount.get(); + + System.out.println(">>>>> " + lsnr2.createdCount.get()); + + // This assertion fails: 0 events get delivered. + assert GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return lsnr2.createdCount.get() == 1; + } + }, 2000L) : "Expected a single event due to 'peevo', but got: " + lsnr2.createdCount.get(); + } + + /** + * @param cache Cache. + * @return Event listener. + */ + private AllEventListener<Integer, String> registerCacheListener( + IgniteCache<Integer, String> cache) { + AllEventListener<Integer, String> lsnr = new AllEventListener<>(); + cache.registerCacheEntryListener( + new MutableCacheEntryListenerConfiguration<>(factoryOf(lsnr), null, true, false)); + return lsnr; + } + + /** + * @param startValue Start value. + * @param node Ignite node. + * @return Primary key. + */ + private int affinityKeyForNode(int startValue, Ignite node) { + Affinity<Integer> affinity = node.affinity(CACHE_NAME); + + ClusterNode localNode = node.cluster().localNode(); + + int key; + + for (key = startValue + 1; !affinity.isPrimary(localNode, key); key++); + + return key; + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration() throws Exception { + IgniteConfiguration cfg = super.getConfiguration(); + + TcpDiscoverySpi spi = new TcpDiscoverySpi(); + + spi.setIpFinder(ipFinder); + + cfg.setDiscoverySpi(spi); + cfg.setCacheConfiguration(cache()); + + return cfg; + } + + /** + * @return Cache configuration. + */ + protected CacheConfiguration<Integer, String> cache() { + CacheConfiguration<Integer, String> cfg = new CacheConfiguration<>(CACHE_NAME); + + cfg.setCacheMode(CacheMode.PARTITIONED); + cfg.setAtomicityMode(CacheAtomicityMode.ATOMIC); + cfg.setRebalanceMode(CacheRebalanceMode.SYNC); + cfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.PRIMARY_SYNC); + cfg.setStartSize(1024); + + return cfg; + } + + /** + * Event listener. + */ + public static class AllEventListener<K, V> implements CacheEntryCreatedListener<K, V>, + CacheEntryUpdatedListener<K, V>, CacheEntryRemovedListener<K, V>, CacheEntryExpiredListener<K, V>, + Serializable { + /** */ + final AtomicInteger createdCount = new AtomicInteger(); + + /** */ + final AtomicInteger updatedCount = new AtomicInteger(); + + /** {@inheritDoc} */ + @Override public void onCreated(Iterable<CacheEntryEvent<? extends K, ? extends V>> evts) { + createdCount.incrementAndGet(); + System.out.printf("onCreate: %s. \n", evts); + } + + /** {@inheritDoc} */ + @Override public void onExpired(Iterable<CacheEntryEvent<? extends K, ? extends V>> evts) { + System.out.printf("onExpired: %s. \n", evts); + } + + /** {@inheritDoc} */ + @Override public void onRemoved(Iterable<CacheEntryEvent<? extends K, ? extends V>> evts) { + System.out.printf("onRemoved: %s. \n", evts); + } + + /** {@inheritDoc} */ + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends K, ? extends V>> evts) { + updatedCount.incrementAndGet(); + System.out.printf("onUpdated: %s.", evts); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0092220c/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousIssueTxSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousIssueTxSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousIssueTxSelfTest.java new file mode 100644 index 0000000..0323b23 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousIssueTxSelfTest.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.configuration.CacheConfiguration; + +/** + * + */ +public class CacheContinuousIssueTxSelfTest extends CacheContinuousIssueSelfTest { + @Override protected CacheConfiguration<Integer, String> cache() { + CacheConfiguration<Integer, String> ccfg = super.cache(); + + ccfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL); + + return ccfg; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0092220c/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java index eddfcf4..2fd8dd5 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java @@ -75,6 +75,8 @@ import org.apache.ignite.internal.processors.cache.distributed.replicated.Ignite import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalAtomicQuerySelfTest; import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalFieldsQuerySelfTest; import org.apache.ignite.internal.processors.cache.local.IgniteCacheLocalQuerySelfTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousIssueSelfTest; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousIssueTxSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicPrimaryWriteOrderSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverAtomicReplicatedSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFailoverTxReplicatedSelfTest; @@ -200,6 +202,8 @@ public class IgniteBinaryCacheQueryTestSuite extends TestSuite { suite.addTestSuite(CacheContinuousQueryFailoverAtomicReplicatedSelfTest.class); suite.addTestSuite(CacheContinuousQueryFailoverTxSelfTest.class); suite.addTestSuite(CacheContinuousQueryFailoverTxReplicatedSelfTest.class); + suite.addTestSuite(CacheContinuousIssueSelfTest.class); + suite.addTestSuite(CacheContinuousIssueTxSelfTest.class); // Reduce fields queries. suite.addTestSuite(GridCacheReduceFieldsQueryLocalSelfTest.class);
