Repository: ignite Updated Branches: refs/heads/ignite-426-2-reb b133235de -> 9dd18c735
IGNITE-426 WIP Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/9dd18c73 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/9dd18c73 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/9dd18c73 Branch: refs/heads/ignite-426-2-reb Commit: 9dd18c735ebee3dfcc17098bf2fe227d6033176d Parents: b133235 Author: nikolay_tikhonov <[email protected]> Authored: Fri Oct 30 21:12:02 2015 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Fri Oct 30 21:12:02 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheMapEntry.java | 61 ++------ .../cache/GridCacheUpdateAtomicResult.java | 14 +- .../dht/atomic/GridDhtAtomicCache.java | 22 +-- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 29 ++++ .../preloader/GridDhtPartitionsFullMessage.java | 13 +- .../GridDhtPartitionsSingleMessage.java | 13 +- .../continuous/CacheContinuousQueryHandler.java | 1 + .../continuous/CacheContinuousQueryManager.java | 18 +-- ...acheContinuousQueryFailoverAbstractTest.java | 152 ++++++++++++++++++- 9 files changed, 223 insertions(+), 100 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/9dd18c73/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 12f9290..49899cd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1786,7 +1786,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme Object updated0 = null; Long updateIdx0 = null; - CI1<IgniteInternalFuture<Void>> contQryNtf = null; synchronized (this) { boolean needVal = intercept || retval || op == GridCacheOperation.TRANSFORM || !F.isEmptyOrNulls(filter); @@ -1896,8 +1895,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme null, null, false, - updateIdx0 == null ? 0 : updateIdx0, - null); + updateIdx0 == null ? 0 : updateIdx0); } // Will update something. else { @@ -1974,23 +1972,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (updateIdx != null) updateIdx0 = updateIdx; - final boolean primary0 = primary; - final CacheObject prevVal0 = prevVal; - final CacheObject evtVal0 = evtVal; - final AffinityTopologyVersion topVer0 = topVer; - final long updateIdx00 = updateIdx0; - - contQryNtf = new CI1<IgniteInternalFuture<Void>>() { - @Override public void apply(IgniteInternalFuture<Void> voidIgniteInternalFuture) { - try { - cctx.continuousQueries().onEntryUpdated(GridCacheMapEntry.this, key, evtVal0, - prevVal0, primary0, false, updateIdx00, topVer0); - } - catch (IgniteCheckedException e) { - // No-op. - } - } - }; + cctx.continuousQueries().onEntryUpdated(GridCacheMapEntry.this, key, evtVal, + prevVal, primary, false, updateIdx0, topVer); } return new GridCacheUpdateAtomicResult(false, @@ -2002,8 +1985,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme null, null, false, - updateIdx0 == null ? 0 : updateIdx0, - contQryNtf); + updateIdx0 == null ? 0 : updateIdx0); } } else @@ -2080,8 +2062,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme null, null, false, - updateIdx0 == null ? 0 : updateIdx0, - null); + updateIdx0 == null ? 0 : updateIdx0); } } @@ -2129,8 +2110,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme null, null, false, - updateIdx0 == null ? 0 : updateIdx, - null); + updateIdx0 == null ? 0 : updateIdx); } } else @@ -2231,8 +2211,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme null, null, false, - updateIdx0 == null ? 0 : updateIdx0, - null); + updateIdx0 == null ? 0 : updateIdx0); else if (interceptorVal != updated0) { updated0 = cctx.unwrapTemporary(interceptorVal); @@ -2314,8 +2293,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme null, null, false, - updateIdx0 == null ? 0 : updateIdx0, - null); + updateIdx0 == null ? 0 : updateIdx0); } if (writeThrough) @@ -2401,26 +2379,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (res) updateMetrics(op, metrics); - if (!isNear()) { - final boolean primary0 = primary; - final CacheObject oldVal0 = oldVal; - final AffinityTopologyVersion topVer0 = topVer; - final long updateIdx00 = updateIdx0; - final CacheObject val0 = val; - - contQryNtf = new CI1<IgniteInternalFuture<Void>>() { - @Override public void apply(IgniteInternalFuture<Void> voidIgniteInternalFuture) { - try { - cctx.continuousQueries().onEntryUpdated(GridCacheMapEntry.this, key, val0, oldVal0, - primary0, false, updateIdx00, topVer0); - } - catch (IgniteCheckedException e) { - // No-op. - } - } - }; - } - cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE); if (intercept) { @@ -2446,8 +2404,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme enqueueVer, conflictCtx, true, - updateIdx0, - contQryNtf); + updateIdx0); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/9dd18c73/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java index 397024b..437f9f1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java @@ -65,9 +65,6 @@ public class GridCacheUpdateAtomicResult { /** Value computed by entry processor. */ private IgniteBiTuple<Object, Exception> res; - /** Continuous query notify listener. */ - private CI1<IgniteInternalFuture<Void>> contQryNtfy; - /** * Constructor. * @@ -91,8 +88,7 @@ public class GridCacheUpdateAtomicResult { @Nullable GridCacheVersion rmvVer, @Nullable GridCacheVersionConflictContext<?, ?> conflictRes, boolean sndToDht, - long updateIdx, - @Nullable CI1<IgniteInternalFuture<Void>> contQryNtfy) { + long updateIdx) { this.success = success; this.oldVal = oldVal; this.newVal = newVal; @@ -103,7 +99,6 @@ public class GridCacheUpdateAtomicResult { this.conflictRes = conflictRes; this.sndToDht = sndToDht; this.updateIdx = updateIdx; - this.contQryNtfy = contQryNtfy; } /** @@ -177,13 +172,6 @@ public class GridCacheUpdateAtomicResult { return sndToDht; } - /** - * @return Continuous notify closure. - */ - public CI1<IgniteInternalFuture<Void>> contQryNtfy() { - return contQryNtfy; - } - /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridCacheUpdateAtomicResult.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/9dd18c73/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 5d64648..d64e2c0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1799,19 +1799,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { readersOnly = true; } - if (updRes.contQryNtfy() != null) { - if (primary && dhtFut != null) { - dhtFut.listen(new CI1<IgniteInternalFuture<Void>>() { - @Override public void apply(IgniteInternalFuture<Void> f) { - if (f.isDone() && f.error() == null) - updRes.contQryNtfy().apply(f); - } - }); - } - else - updRes.contQryNtfy().apply(null); - } - if (dhtFut != null) { if (updRes.sendToDht()) { // Send to backups even in case of remove-remove scenarios. GridCacheVersionConflictContext<?, ?> conflictCtx = updRes.conflictResolveResult(); @@ -1849,6 +1836,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { "[entry=" + entry + ", filter=" + Arrays.toString(req.filter()) + ']'); } } + else if (!entry.isNear()) { + ctx.continuousQueries().onEntryUpdated(entry, entry.key(), updRes.newValue(), updRes.oldValue(), + primary, false, updRes.updateIdx(), topVer); + } if (hasNear) { if (primary && updRes.sendToDht()) { @@ -2574,8 +2565,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (updRes.removeVersion() != null) ctx.onDeferredDelete(entry, updRes.removeVersion()); - if (updRes.contQryNtfy() != null) - updRes.contQryNtfy().apply(null); + if (updRes.success() && !entry.isNear()) + ctx.continuousQueries().onEntryUpdated(entry, entry.key(), updRes.newValue(), + updRes.oldValue(), false, false, updRes.updateIdx(), req.topologyVersion()); entry.onUnlock(); http://git-wip-us.apache.org/repos/asf/ignite/blob/9dd18c73/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index d9c12eb..61374cb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentMap; @@ -44,6 +45,7 @@ import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T4; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; @@ -98,6 +100,9 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> /** Future keys. */ private Collection<KeyCacheObject> keys; + /** Updates. */ + private List<T4<GridDhtCacheEntry, CacheObject, CacheObject, Long>> updates; + /** */ private boolean waitForExchange; @@ -129,6 +134,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> keys = new ArrayList<>(updateReq.keys().size()); + updates = new ArrayList<>(updateReq.keys().size()); + boolean topLocked = updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest()); waitForExchange = !topLocked; @@ -222,6 +229,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> keys.add(entry.key()); + updates.add(new T4<>(entry, val, prevVal, updateIdx)); + for (ClusterNode node : dhtNodes) { UUID nodeId = node.id(); @@ -326,6 +335,26 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> for (KeyCacheObject key : keys) updateRes.addFailedKey(key, err); } + else { + assert keys.size() == updates.size(); + + int i = 0; + + for (KeyCacheObject key : keys) { + T4<GridDhtCacheEntry, CacheObject, CacheObject, Long> upd = updates.get(i); + + try { + cctx.continuousQueries().onEntryUpdated(upd.get1(), key, upd.get2(), upd.get3(), true, false, + upd.get4(), updateRes.topologyVersion()); + } + catch (IgniteCheckedException e) { + log.warning("Failed to send continuous query message. [key=" + key + ", newVal=" + + upd.get1() + ", err=" + e + "]"); + } + + ++i; + } + } if (updateReq.writeSynchronizationMode() == FULL_SYNC) completionCb.apply(updateReq, updateRes); http://git-wip-us.apache.org/repos/asf/ignite/blob/9dd18c73/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index 758818d..3f4f9bc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -52,7 +52,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa /** Partitions update counters. */ @GridToStringInclude @GridDirectTransient - private Map<Integer, Map<Integer, Long>> partCntrs = new HashMap<>(); + private Map<Integer, Map<Integer, Long>> partCntrs; /** Serialized partitions counters. */ private byte[] partCntrsBytes; @@ -106,6 +106,9 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa * @param cntrMap Partition update counters. */ public void addPartitionUpdateCounters(int cacheId, Map<Integer, Long> cntrMap) { + if (partCntrs == null) + partCntrs = new HashMap<>(); + if (!partCntrs.containsKey(cacheId)) partCntrs.put(cacheId, cntrMap); } @@ -115,9 +118,13 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa * @return Partition update counters. */ public Map<Integer, Long> partitionUpdateCounters(int cacheId) { - Map<Integer, Long> res = partCntrs.get(cacheId); + if (partCntrs != null) { + Map<Integer, Long> res = partCntrs.get(cacheId); + + return res != null ? res : Collections.<Integer, Long>emptyMap(); + } - return res != null ? res : Collections.<Integer, Long>emptyMap(); + return Collections.emptyMap(); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/9dd18c73/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index 547c0f6..a2366bf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@ -50,7 +50,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes /** Partitions update counters. */ @GridToStringInclude @GridDirectTransient - private Map<Integer, Map<Integer, Long>> partCntrs = new HashMap<>(); + private Map<Integer, Map<Integer, Long>> partCntrs; /** Serialized partitions counters. */ private byte[] partCntrsBytes; @@ -103,6 +103,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes * @param cntrMap Partition update counters. */ public void partitionUpdateCounters(int cacheId, Map<Integer, Long> cntrMap) { + if (partCntrs == null) + partCntrs = new HashMap<>(); + partCntrs.put(cacheId, cntrMap); } @@ -111,9 +114,13 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes * @return Partition update counters. */ public Map<Integer, Long> partitionUpdateCounters(int cacheId) { - Map<Integer, Long> res = partCntrs.get(cacheId); + if (partCntrs != null) { + Map<Integer, Long> res = partCntrs.get(cacheId); + + return res != null ? res : Collections.<Integer, Long>emptyMap(); + } - return res != null ? res : Collections.<Integer, Long>emptyMap(); + return Collections.emptyMap(); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/9dd18c73/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 1240ad1..cb0ba5a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -785,6 +785,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { return e; else { GridLongList filteredEvts = new GridLongList(buf.size()); + int size = 0; Iterator<Long> iter = buf.iterator(); http://git-wip-us.apache.org/repos/asf/ignite/blob/9dd18c73/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index bdd009a..9912040 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -282,15 +282,15 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { initialized = true; } - CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry( - cctx.cacheId(), - EXPIRED, - key, - null, - lsnr.oldValueRequired() ? oldVal : null, - e.partition(), - -1, - null); + CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry( + cctx.cacheId(), + EXPIRED, + key, + null, + lsnr.oldValueRequired() ? oldVal : null, + e.partition(), + -1, + null); CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent( cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0); http://git-wip-us.apache.org/repos/asf/ignite/blob/9dd18c73/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java index 049d838..b31b842 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java @@ -30,7 +30,11 @@ import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -87,6 +91,7 @@ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.transactions.Transaction; +import org.eclipse.jetty.util.ConcurrentHashSet; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; @@ -895,7 +900,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo */ private void checkEvents(final List<T3<Object, Object, Object>> expEvts, final CacheEventListener2 lsnr, boolean lostAllow) throws Exception { - GridTestUtils.waitForCondition(new PA() { + boolean b = GridTestUtils.waitForCondition(new PA() { @Override public boolean apply() { return expEvts.size() == lsnr.size(); } @@ -919,7 +924,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo while (iter.hasNext()) { CacheEntryEvent<?, ?> e = iter.next(); - if ((exp.get2() != null && e.getValue() != null && exp.get2() == e.getValue()) + if ((exp.get2() != null && e.getValue() != null && exp.get2().equals(e.getValue())) && equalOldValue(e, exp)) { found = true; @@ -945,7 +950,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo for (T3<Object, Object, Object> lostEvt : lostEvents) { if (e.getKey().equals(lostEvt.get1()) && e.getValue().equals(lostEvt.get2()) - && equalOldValue(e, lostEvt)) { + /*&& equalOldValue(e, lostEvt)*/) { found = true; lostEvents.remove(lostEvt); @@ -972,7 +977,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo log.error("Duplicate event: " + e); } - assertFalse("Received duplicate events, see log for details.", dup); + assertFalse("Received duplicate events, see log for details.", !lostEvents.isEmpty()); } if (!lostAllow && !lostEvents.isEmpty()) { @@ -989,6 +994,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo expEvts.clear(); lsnr.evts.clear(); + lsnr.vals.clear(); } /** @@ -1658,7 +1664,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo QueryCursor<?> cur = qryClnCache.query(qry); - for (int i = 0; i < 20; i++) { + for (int i = 0; i < 10; i++) { final int idx = i % (SRV_NODES - 1); log.info("Stop node: " + idx); @@ -1933,6 +1939,142 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo /** * @throws Exception If failed. */ + public void testMultiThreadedFailover() throws Exception { + this.backups = 2; + + final int SRV_NODES = 4; + + startGridsMultiThreaded(SRV_NODES); + + client = true; + + final Ignite qryCln = startGrid(SRV_NODES); + + client = false; + + final IgniteCache<Object, Object> qryClnCache = qryCln.cache(null); + + final CacheEventListener2 lsnr = new CacheEventListener2(); + + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + + qry.setLocalListener(lsnr); + + QueryCursor<?> cur = qryClnCache.query(qry); + + final AtomicBoolean stop = new AtomicBoolean(); + + final int THREAD = 4; + + final int PARTS = THREAD; + + final List<T3<Object, Object, Object>> expEvts = new CopyOnWriteArrayList<>(); + + final AtomicReference<CyclicBarrier> checkBarrier = new AtomicReference<>(); + + IgniteInternalFuture<?> restartFut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + final int idx = SRV_NODES + 1; + + while (!stop.get() && !err) { + log.info("Start node: " + idx); + + startGrid(idx); + + awaitPartitionMapExchange(); + + Thread.sleep(100); + + try { + log.info("Stop node: " + idx); + + stopGrid(idx); + + awaitPartitionMapExchange(); + + Thread.sleep(100); + } + catch (Exception e) { + log.warning("Failed to stop nodes.", e); + } + + CyclicBarrier bar = new CyclicBarrier(THREAD + 1 /* plus start/stop thread */, new Runnable() { + @Override public void run() { + try { + checkEvents(expEvts, lsnr, false); + } + catch (Exception e) { + log.error("Failed.", e); + + err = true; + + stop.set(true); + } + finally { + checkBarrier.set(null); + } + } + }); + + assertTrue(checkBarrier.compareAndSet(null, bar)); + + if (stop.get() && !err) + bar.await(5, SECONDS); + } + + return null; + } + }); + + final long stopTime = System.currentTimeMillis() + 60_000; + + final AtomicInteger valCntr = new AtomicInteger(0); + + GridTestUtils.runMultiThreaded(new Runnable() { + final ThreadLocalRandom rnd = ThreadLocalRandom.current(); + + @Override public void run() { + try { + while (System.currentTimeMillis() < stopTime && !stop.get() && !err) { + Integer key = rnd.nextInt(PARTS); + + Integer val = valCntr.incrementAndGet(); + + Integer prevVal = (Integer)qryClnCache.getAndPut(key, val); + + expEvts.add(new T3<>((Object)key, (Object)val, (Object)prevVal)); + + CyclicBarrier bar = checkBarrier.get(); + + if (bar != null) + bar.await(); + } + } + catch (Exception e){ + log.error("Failed.", e); + + err = true; + + stop.set(true); + } + finally { + stop.set(true); + } + } + }, THREAD, "update-thread"); + + restartFut.get(); + + checkEvents(expEvts, lsnr, true); + + cur.close(); + + assertFalse("Unexpected error during test, see log for details.", err); + } + + /** + * @throws Exception If failed. + */ public void testMultiThreaded() throws Exception { this.backups = 2;
