IGNITE-426 Temp
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a8882fd3 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a8882fd3 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a8882fd3 Branch: refs/heads/ignite-426-2-reb Commit: a8882fd3ba6c180c71738ae3034627bc57e24e48 Parents: 878a87b Author: nikolay_tikhonov <[email protected]> Authored: Thu Oct 22 15:17:28 2015 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Wed Oct 28 15:24:33 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheMapEntry.java | 10 ++-- .../dht/GridDhtPartitionTopologyImpl.java | 5 ++ .../dht/atomic/GridDhtAtomicUpdateFuture.java | 1 - .../continuous/CacheContinuousQueryHandler.java | 9 ---- ...acheContinuousQueryFailoverAbstractTest.java | 14 +++-- ...ridCacheContinuousQueryAbstractSelfTest.java | 56 ++++++++++++++++++++ .../GridCacheContinuousQueryTxSelfTest.java | 49 +++++++++++++++++ .../IgniteCacheQuerySelfTestSuite.java | 2 + 8 files changed, 125 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/a8882fd3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index bbd2ce0..b445619 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1187,9 +1187,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme subjId, null, taskName); } - if (!isNear() && - // Ignore events on backups for one phase commit. - !(tx.onePhaseCommit() && updateIdx != null && updateIdx == 0)) + if (cctx.isLocal() || cctx.isReplicated() || + (!isNear() && !(tx != null && !tx.onePhaseCommit() && !tx.local()))) cctx.continuousQueries().onEntryUpdated(this, key, val, old, tx.local(), false, updateIdx0, topVer); cctx.dataStructures().onEntryUpdated(key, false); @@ -1364,9 +1363,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme taskName); } - if (!isNear() && - // Ignore events on backups for one phase commit. - !(tx.onePhaseCommit() && updateIdx != null && updateIdx == 0)) + if (cctx.isLocal() || cctx.isReplicated() || + (!isNear() && !(tx != null && !tx.onePhaseCommit() && !tx.local()))) cctx.continuousQueries().onEntryUpdated(this, key, null, old, tx.local(), false, updateIdx0, topVer); cctx.dataStructures().onEntryUpdated(key, true); http://git-wip-us.apache.org/repos/asf/ignite/blob/a8882fd3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index 1195ddd..a210a29 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -1356,6 +1356,11 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { if (!rebalancedTopVer.equals(topVer)) { for (int i = 0; i < cctx.affinity().partitions(); i++) { List<ClusterNode> affNodes = cctx.affinity().nodes(i, topVer); + + // Topology doesn't contain server nodes (just clients). + if (affNodes.isEmpty()) + continue; + List<ClusterNode> owners = owners(i); if (affNodes.size() != owners.size() || !owners.containsAll(affNodes)) http://git-wip-us.apache.org/repos/asf/ignite/blob/a8882fd3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 169e6a7..d9c12eb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -36,7 +36,6 @@ import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; -import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; http://git-wip-us.apache.org/repos/asf/ignite/blob/a8882fd3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 8e20fbc..bd44180 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -63,7 +63,6 @@ import org.apache.ignite.internal.processors.platform.cache.query.PlatformContin import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -576,8 +575,6 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { /** */ private final Map<Long, CacheContinuousQueryEntry> pendingEnts = new TreeMap<>(); - private List<T2<Long, CacheContinuousQueryEntry>> firedEvents = new ArrayList<>(); - /** * @param log Logger. */ @@ -601,8 +598,6 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { if (lastFiredEvt == INIT_VALUE) { lastFiredEvt = entry.updateIndex(); - firedEvents.add(new T2<>(lastFiredEvt, entry)); - return F.asList(entry); } @@ -612,8 +607,6 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { lastFiredEvt = 1; - firedEvents.add(new T2<>(lastFiredEvt, entry)); - return F.asList(entry); } @@ -643,8 +636,6 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { entries.add(e.getValue()); - firedEvents.add(new T2<>(e.getKey(), e.getValue())); - iter.remove(); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/a8882fd3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java index 6029761..6979f6a 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java @@ -1564,11 +1564,15 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo restartFut.get(); - boolean check = GridTestUtils.waitForCondition(new GridAbsPredicate() { - @Override public boolean apply() { - return checkEvents(false, expEvts, lsnr); - } - }, 10_000); + boolean check = true; + + if (!expEvts.isEmpty()) { + check = GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + return checkEvents(false, expEvts, lsnr); + } + }, 10_000); + } if (!check) assertTrue(checkEvents(true, expEvts, lsnr)); http://git-wip-us.apache.org/repos/asf/ignite/blob/a8882fd3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java index 46a5f8c..637d8a2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryAbstractSelfTest.java @@ -75,6 +75,7 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheMode.LOCAL; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheMode.REPLICATED; import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC; @@ -368,6 +369,61 @@ public abstract class GridCacheContinuousQueryAbstractSelfTest extends GridCommo /** * @throws Exception If failed. */ + public void testTwoQueryListener() throws Exception { + if (cacheMode() == LOCAL) + return; + + IgniteCache<Integer, Integer> cache = grid(0).cache(null); + IgniteCache<Integer, Integer> cache1 = grid(1).cache(null); + + final AtomicInteger cntr = new AtomicInteger(0); + final AtomicInteger cntr1 = new AtomicInteger(0); + + ContinuousQuery<Integer, Integer> qry1 = new ContinuousQuery<>(); + ContinuousQuery<Integer, Integer> qry2 = new ContinuousQuery<>(); + + qry1.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends Integer> ignore : evts) + cntr.incrementAndGet(); + } + }); + + qry2.setLocalListener(new CacheEntryUpdatedListener<Integer, Integer>() { + @Override public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) { + for (CacheEntryEvent<? extends Integer, ? extends Integer> ignore : evts) + cntr1.incrementAndGet(); + } + }); + + try (QueryCursor<Cache.Entry<Integer, Integer>> query2 = cache1.query(qry2); + QueryCursor<Cache.Entry<Integer, Integer>> query1 = cache.query(qry1)) { + for (int i = 0; i < gridCount(); i++) { + IgniteCache<Object, Object> cache0 = grid(i).cache(null); + + cache0.put(1, 1); + cache0.put(2, 2); + cache0.put(3, 3); + + cache0.remove(1); + cache0.remove(2); + cache0.remove(3); + + final int iter = i + 1; + + assert GridTestUtils.waitForCondition(new PA() { + @Override public boolean apply() { + return iter * 6 /* count operation */ * 2 /* count continues queries*/ + == (cntr.get() + cntr1.get()); + } + }, 5000L); + } + } + } + + /** + * @throws Exception If failed. + */ public void testEntriesByFilter() throws Exception { IgniteCache<Integer, Integer> cache = grid(0).cache(null); http://git-wip-us.apache.org/repos/asf/ignite/blob/a8882fd3/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxSelfTest.java new file mode 100644 index 0000000..91b6b9c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/GridCacheContinuousQueryTxSelfTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheMode; +import org.apache.ignite.configuration.NearCacheConfiguration; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; + +/** + * Continuous queries tests for atomic cache. + */ +public class GridCacheContinuousQueryTxSelfTest extends GridCacheContinuousQueryPartitionedSelfTest { + /** {@inheritDoc} */ + @Override protected CacheAtomicityMode atomicityMode() { + return TRANSACTIONAL; + } + + /** {@inheritDoc} */ + @Override protected CacheMode cacheMode() { + return CacheMode.REPLICATED; + } + + /** {@inheritDoc} */ + @Override protected NearCacheConfiguration nearConfiguration() { + return null; + } + + /** {@inheritDoc} */ + @Override public void testInternalKey() throws Exception { + // No-op. + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/a8882fd3/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java index 91dc388..e16dffc 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java @@ -81,6 +81,7 @@ import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheCon import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedOneNodeSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedP2PDisabledSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedSelfTest; +import org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryTxSelfTest; import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientReconnectTest; import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTest; import org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTxReconnectTest; @@ -163,6 +164,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite { suite.addTestSuite(GridCacheContinuousQueryPartitionedSelfTest.class); suite.addTestSuite(GridCacheContinuousQueryPartitionedOnlySelfTest.class); suite.addTestSuite(GridCacheContinuousQueryPartitionedP2PDisabledSelfTest.class); + suite.addTestSuite(GridCacheContinuousQueryTxSelfTest.class); suite.addTestSuite(GridCacheContinuousQueryAtomicSelfTest.class); suite.addTestSuite(GridCacheContinuousQueryAtomicNearEnabledSelfTest.class); suite.addTestSuite(GridCacheContinuousQueryAtomicP2PDisabledSelfTest.class);
