IGNITE-426 Implemented review notes.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6c59f761 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6c59f761 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6c59f761 Branch: refs/heads/ignite-426-2-reb Commit: 6c59f76170932b69552f8744369187f88227dfd0 Parents: ecc3216 Author: nikolay_tikhonov <[email protected]> Authored: Wed Oct 28 15:07:31 2015 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Wed Oct 28 16:13:30 2015 +0300 ---------------------------------------------------------------------- .../internal/GridEventConsumeHandler.java | 6 + .../internal/GridMessageListenHandler.java | 6 + .../processors/cache/GridCacheMapEntry.java | 13 +- .../cache/GridCacheUpdateAtomicResult.java | 1 - .../dht/GridDhtPartitionTopologyImpl.java | 3 +- .../distributed/dht/GridDhtTxFinishRequest.java | 4 +- .../dht/atomic/GridDhtAtomicCache.java | 6 +- .../distributed/near/GridNearAtomicCache.java | 2 +- .../CacheContinuousQueryBatchAck.java | 11 +- .../continuous/CacheContinuousQueryEntry.java | 67 +++++- .../continuous/CacheContinuousQueryHandler.java | 220 ++++++++++++++++--- .../continuous/CacheContinuousQueryManager.java | 1 - .../cache/transactions/IgniteTxEntry.java | 16 +- .../continuous/GridContinuousHandler.java | 6 + .../continuous/GridContinuousProcessor.java | 16 +- .../StartRoutineAckDiscoveryMessage.java | 12 +- .../StartRoutineDiscoveryMessage.java | 18 +- ...acheContinuousQueryFailoverAbstractTest.java | 67 +++++- 18 files changed, 387 insertions(+), 88 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/6c59f761/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java index dc3842b..fc65b55 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java @@ -23,6 +23,7 @@ import java.io.ObjectInput; import java.io.ObjectOutput; import java.util.Collection; import java.util.LinkedList; +import java.util.Map; import java.util.Queue; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; @@ -129,6 +130,11 @@ class GridEventConsumeHandler implements GridContinuousHandler { } /** {@inheritDoc} */ + @Override public void updateIdx(Map<Integer, Long> idx) { + // No-op. + } + + /** {@inheritDoc} */ @Override public RegisterStatus register(final UUID nodeId, final UUID routineId, final GridKernalContext ctx) throws IgniteCheckedException { assert nodeId != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/6c59f761/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java index bddebba..7711843 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; import java.util.Collection; +import java.util.Map; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.managers.deployment.GridDeployment; @@ -102,6 +103,11 @@ public class GridMessageListenHandler implements GridContinuousHandler { } /** {@inheritDoc} */ + @Override public void updateIdx(Map<Integer, Long> idx) { + // No-op. + } + + /** {@inheritDoc} */ @Override public RegisterStatus register(UUID nodeId, UUID routineId, final GridKernalContext ctx) throws IgniteCheckedException { ctx.io().addUserMessageListener(topic, pred); http://git-wip-us.apache.org/repos/asf/ignite/blob/6c59f761/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index e842f61..12f9290 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1768,12 +1768,6 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme CacheObject oldVal; CacheObject updated; - if (!primary) { - int z = 0; - - ++z; - } - GridCacheVersion enqueueVer = null; GridCacheVersionConflictContext<?, ?> conflictCtx = null; @@ -1990,7 +1984,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme @Override public void apply(IgniteInternalFuture<Void> voidIgniteInternalFuture) { try { cctx.continuousQueries().onEntryUpdated(GridCacheMapEntry.this, key, evtVal0, - prevVal0, primary0, false, updateIdx00, topVer0); + prevVal0, primary0, false, updateIdx00, topVer0); } catch (IgniteCheckedException e) { // No-op. @@ -2412,12 +2406,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme final CacheObject oldVal0 = oldVal; final AffinityTopologyVersion topVer0 = topVer; final long updateIdx00 = updateIdx0; + final CacheObject val0 = val; contQryNtf = new CI1<IgniteInternalFuture<Void>>() { @Override public void apply(IgniteInternalFuture<Void> voidIgniteInternalFuture) { try { - cctx.continuousQueries().onEntryUpdated(GridCacheMapEntry.this, key, val, oldVal0, primary0, - false, updateIdx00, topVer0); + cctx.continuousQueries().onEntryUpdated(GridCacheMapEntry.this, key, val0, oldVal0, + primary0, false, updateIdx00, topVer0); } catch (IgniteCheckedException e) { // No-op. http://git-wip-us.apache.org/repos/asf/ignite/blob/6c59f761/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java index 9e2aca6..397024b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.cache; import javax.cache.processor.EntryProcessor; - import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; http://git-wip-us.apache.org/repos/asf/ignite/blob/6c59f761/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java index a210a29..d30cc88 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java @@ -964,8 +964,7 @@ class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { /** {@inheritDoc} */ @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) @Nullable @Override public GridDhtPartitionMap update(@Nullable GridDhtPartitionExchangeId exchId, - GridDhtPartitionMap parts, - @Nullable Map<Integer, Long> cntrMap) { + GridDhtPartitionMap parts, @Nullable Map<Integer, Long> cntrMap) { if (log.isDebugEnabled()) log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']'); http://git-wip-us.apache.org/repos/asf/ignite/blob/6c59f761/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java index 18ac921..de6326e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxFinishRequest.java @@ -191,6 +191,7 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { * @param subjId Subject ID. * @param taskNameHash Task name hash. * @param updateIdxs Partition update idxs. + * @param addDepInfo Deployment info flag. */ public GridDhtTxFinishRequest( UUID nearNodeId, @@ -215,11 +216,12 @@ public class GridDhtTxFinishRequest extends GridDistributedTxFinishRequest { int txSize, @Nullable UUID subjId, int taskNameHash, + boolean addDepInfo, Collection<Long> updateIdxs ) { this(nearNodeId, futId, miniId, topVer, xidVer, commitVer, threadId, isolation, commit, invalidate, sys, plc, sysInvalidate, syncCommit, syncRollback, baseVer, committedVers, rolledbackVers, pendingVers, txSize, - subjId, taskNameHash); + subjId, taskNameHash, addDepInfo); if (updateIdxs != null && !updateIdxs.isEmpty()) { partUpdateCnt = new GridLongList(updateIdxs.size()); http://git-wip-us.apache.org/repos/asf/ignite/blob/6c59f761/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index d26ad97..5d64648 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1804,7 +1804,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { dhtFut.listen(new CI1<IgniteInternalFuture<Void>>() { @Override public void apply(IgniteInternalFuture<Void> f) { if (f.isDone() && f.error() == null) - updRes.contQryNtfy().apply(f); + updRes.contQryNtfy().apply(f); } }); } @@ -2557,7 +2557,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /*event*/true, /*metrics*/true, /*primary*/false, - /*check version*/op != TRANSFORM || !req.forceTransformBackups(), + /*check version*/!req.forceTransformBackups(), req.topologyVersion(), CU.empty0(), replicate ? DR_BACKUP : DR_NONE, @@ -2614,7 +2614,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } catch (ClusterTopologyCheckedException ignored) { U.warn(log, "Failed to send DHT atomic update response to node because it left grid: " + - nodeId); + req.nodeId()); } catch (IgniteCheckedException e) { U.error(log, "Failed to send DHT atomic update response (did node leave grid?) [nodeId=" + nodeId + http://git-wip-us.apache.org/repos/asf/ignite/blob/6c59f761/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 4f2caa1..706655b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -353,7 +353,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { /*event*/true, /*metrics*/true, /*primary*/false, - /*check version*/op != TRANSFORM || !req.forceTransformBackups(), + /*check version*/!req.forceTransformBackups(), req.topologyVersion(), CU.empty0(), DR_NONE, http://git-wip-us.apache.org/repos/asf/ignite/blob/6c59f761/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java index 1e9a848..f89c466 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryBatchAck.java @@ -97,7 +97,8 @@ public class CacheContinuousQueryBatchAck extends GridCacheMessage { writer.incrementState(); case 4: - if (!writer.writeMap("updateIdxs", updateIdxs, MessageCollectionItemType.INT, MessageCollectionItemType.LONG)) + if (!writer.writeMap("updateIdxs", updateIdxs, MessageCollectionItemType.INT, + MessageCollectionItemType.LONG)) return false; writer.incrementState(); @@ -127,7 +128,8 @@ public class CacheContinuousQueryBatchAck extends GridCacheMessage { reader.incrementState(); case 4: - updateIdxs = reader.readMap("updateIdxs", MessageCollectionItemType.INT, MessageCollectionItemType.LONG, false); + updateIdxs = reader.readMap("updateIdxs", MessageCollectionItemType.INT, MessageCollectionItemType.LONG, + false); if (!reader.isLastRead()) return false; @@ -140,6 +142,11 @@ public class CacheContinuousQueryBatchAck extends GridCacheMessage { } /** {@inheritDoc} */ + @Override public boolean addDeploymentInfo() { + return false; + } + + /** {@inheritDoc} */ @Override public byte directType() { return 114; } http://git-wip-us.apache.org/repos/asf/ignite/blob/6c59f761/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java index 896751e..939f7a3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; @@ -93,9 +94,11 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { /** */ @GridToStringInclude - @GridDirectTransient private AffinityTopologyVersion topVer; + /** Filtered events. */ + private GridLongList filteredEvts; + /** * Required by {@link Message}. */ @@ -179,6 +182,10 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { */ void markFiltered() { flags |= FILTERED_ENTRY; + newVal = null; + oldVal = null; + key = null; + depInfo = null; } /** @@ -191,11 +198,25 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { /** * @return {@code True} if entry was filtered. */ - boolean filtered() { + boolean isFiltered() { return (flags & FILTERED_ENTRY) != 0; } /** + * @param idxs Filtered indexes. + */ + void filteredEvents(GridLongList idxs) { + filteredEvts = idxs; + } + + /** + * @return previous filtered events. + */ + long[] filteredEvents() { + return filteredEvts == null ? null : filteredEvts.array(); + } + + /** * @param cctx Cache context. * @throws IgniteCheckedException In case of error. */ @@ -217,13 +238,15 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { * @throws IgniteCheckedException In case of error. */ void unmarshal(GridCacheContext cctx, @Nullable ClassLoader ldr) throws IgniteCheckedException { - key.finishUnmarshal(cctx.cacheObjectContext(), ldr); + if (!isFiltered()) { + key.finishUnmarshal(cctx.cacheObjectContext(), ldr); - if (newVal != null) - newVal.finishUnmarshal(cctx.cacheObjectContext(), ldr); + if (newVal != null) + newVal.finishUnmarshal(cctx.cacheObjectContext(), ldr); - if (oldVal != null) - oldVal.finishUnmarshal(cctx.cacheObjectContext(), ldr); + if (oldVal != null) + oldVal.finishUnmarshal(cctx.cacheObjectContext(), ldr); + } } /** @@ -322,6 +345,18 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { writer.incrementState(); + case 8: + if (!writer.writeMessage("filteredEvts", filteredEvts)) + return false; + + writer.incrementState(); + + case 9: + if (!writer.writeMessage("topVer", topVer)) + return false; + + writer.incrementState(); + } return true; @@ -403,6 +438,22 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { reader.incrementState(); + case 8: + filteredEvts = reader.readMessage("filteredEvts"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 9: + topVer = reader.readMessage("topVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } return reader.afterMessageRead(CacheContinuousQueryEntry.class); @@ -410,7 +461,7 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 8; + return 10; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/6c59f761/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index bd44180..8da7ed2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.TreeSet; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -53,6 +54,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; +import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager; import org.apache.ignite.internal.processors.cache.query.CacheQueryType; @@ -60,6 +62,7 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousBatch; import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter; import org.apache.ignite.internal.processors.continuous.GridContinuousHandler; import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryFilter; +import org.apache.ignite.internal.util.GridLongList; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.C1; import org.apache.ignite.internal.util.typedef.F; @@ -130,11 +133,17 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { private transient ConcurrentMap<Integer, PartitionRecovery> rcvs; /** */ + private transient ConcurrentMap<Integer, HoleBuffer> snds = new ConcurrentHashMap<>(); + + /** */ private transient AcknowledgeBuffer ackBuf; /** */ private transient int cacheId; + /** */ + private Map<Integer, Long> initUpdIdx; + /** * Required by {@link Externalizable}. */ @@ -187,8 +196,6 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { this.skipPrimaryCheck = skipPrimaryCheck; this.localCache = locCache; - rcvs = new ConcurrentHashMap<>(); - cacheId = CU.cacheId(cacheName); } @@ -213,6 +220,11 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { } /** {@inheritDoc} */ + @Override public void updateIdx(Map<Integer, Long> idx) { + this.initUpdIdx = idx; + } + + /** {@inheritDoc} */ @Override public RegisterStatus register(final UUID nodeId, final UUID routineId, final GridKernalContext ctx) throws IgniteCheckedException { assert nodeId != null; @@ -229,6 +241,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { ackBuf = new AcknowledgeBuffer(); + rcvs = new ConcurrentHashMap<>(); + final boolean loc = nodeId.equals(ctx.localNodeId()); assert !skipPrimaryCheck || loc; @@ -253,8 +267,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { } } - @Override public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, - boolean primary, + @Override public void onEntryUpdated(CacheContinuousQueryEvent<K, V> evt, boolean primary, boolean recordIgniteEvt) { if (ignoreExpired && evt.getEventType() == EventType.EXPIRED) return; @@ -288,7 +301,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { if (primary || skipPrimaryCheck) { if (loc) { if (!localCache) { - Collection<CacheContinuousQueryEntry> entries = handleEntry(ctx, entry); + Collection<CacheContinuousQueryEntry> entries = clientHandleEvent(ctx, entry); if (!entries.isEmpty()) { final IgniteCache cache = cctx.kernalContext().cache().jcache(cctx.name()); @@ -302,7 +315,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { }, new IgnitePredicate<CacheContinuousQueryEntry>() { @Override public boolean apply(CacheContinuousQueryEntry entry) { - return !entry.filtered(); + return !entry.isFiltered(); } } ); @@ -314,14 +327,18 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { } } else { - if (!entry.filtered()) + if (!entry.isFiltered()) locLsnr.onUpdated(F.<CacheEntryEvent<? extends K, ? extends V>>asList(evt)); } } else { - prepareEntry(cctx, nodeId, entry); + if (!entry.isFiltered()) + prepareEntry(cctx, nodeId, entry); - ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true); + CacheContinuousQueryEntry e = handleEntry(entry); + + if (e != null) + ctx.continuous().addNotification(nodeId, routineId, entry, topic, sync, true); } } else { @@ -388,8 +405,10 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { try { GridCacheContext<K, V> cctx = cacheContext(ctx); - for (CacheContinuousQueryEntry e : backupQueue) - prepareEntry(cctx, nodeId, e); + for (CacheContinuousQueryEntry e : backupQueue) { + if (!e.isFiltered()) + prepareEntry(cctx, nodeId, e); + } ctx.continuous().addBackupNotification(nodeId, routineId, backupQueue, topic); @@ -514,7 +533,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { Collection<CacheContinuousQueryEntry> entries0 = new ArrayList<>(); for (CacheContinuousQueryEntry e : entries) - entries0.addAll(handleEntry(ctx, e)); + entries0.addAll(clientHandleEvent(ctx, e)); Iterable<CacheEntryEvent<? extends K, ? extends V>> evts = F.viewReadOnly(entries0, new C1<CacheContinuousQueryEntry, CacheEntryEvent<? extends K, ? extends V>>() { @@ -524,7 +543,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { }, new IgnitePredicate<CacheContinuousQueryEntry>() { @Override public boolean apply(CacheContinuousQueryEntry entry) { - return !entry.filtered(); + return !entry.isFiltered(); } } ); @@ -537,7 +556,8 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { * @param e entry. * @return Entry collection. */ - private Collection<CacheContinuousQueryEntry> handleEntry(GridKernalContext ctx, CacheContinuousQueryEntry e) { + private Collection<CacheContinuousQueryEntry> clientHandleEvent(GridKernalContext ctx, + CacheContinuousQueryEntry e) { assert e != null; // Initial query entry or evicted entry. @@ -548,7 +568,7 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { PartitionRecovery rec = rcvs.get(e.partition()); if (rec == null) { - rec = new PartitionRecovery(ctx.log(getClass())); + rec = new PartitionRecovery(ctx.log(getClass()), cacheContext(ctx), initUpdIdx.get(e.partition())); PartitionRecovery oldRec = rcvs.putIfAbsent(e.partition(), rec); @@ -560,26 +580,65 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { } /** + * @param e Entry. + * @return Entry. + */ + private CacheContinuousQueryEntry handleEntry(CacheContinuousQueryEntry e) { + assert e != null; + assert snds != null; + + // Initial query entry. + // This events should be fired immediately. + if (e.updateIndex() == -1) + return e; + + HoleBuffer buf = snds.get(e.partition()); + + if (buf == null) { + buf = new HoleBuffer(); + + HoleBuffer oldRec = snds.putIfAbsent(e.partition(), buf); + + if (oldRec != null) + buf = oldRec; + } + + return buf.handle(e); + } + + /** * */ private static class PartitionRecovery { + /** Event which means hole in sequence. */ + private static final CacheContinuousQueryEntry HOLE = new CacheContinuousQueryEntry(); + /** */ private IgniteLogger log; /** */ - private static final long INIT_VALUE = -100; + private GridCacheContext cctx; + + /** */ + private long lastFiredEvt; /** */ - private long lastFiredEvt = INIT_VALUE; + private AffinityTopologyVersion curTop; /** */ - private final Map<Long, CacheContinuousQueryEntry> pendingEnts = new TreeMap<>(); + private final Map<Long, CacheContinuousQueryEntry> pendingEvts = new TreeMap<>(); /** * @param log Logger. */ - public PartitionRecovery(IgniteLogger log) { + public PartitionRecovery(IgniteLogger log, GridCacheContext cctx, Long initIdx) { this.log = log; + this.cctx = cctx; + + if (initIdx != null) { + this.lastFiredEvt = initIdx; + this.curTop = cctx.topology().topologyVersion(); + } } /** @@ -593,26 +652,55 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { List<CacheContinuousQueryEntry> entries; - synchronized (pendingEnts) { + synchronized (pendingEvts) { // Received first event. - if (lastFiredEvt == INIT_VALUE) { + if (curTop == null) { lastFiredEvt = entry.updateIndex(); + curTop = entry.topologyVersion(); + return F.asList(entry); } - // Handle case when nodes owning partition left from topology. - if (entry.updateIndex() == 1 && !entry.isBackup()) { - pendingEnts.clear(); + if (curTop.compareTo(entry.topologyVersion()) < 0) { + GridCacheAffinityManager aff = cctx.affinity(); - lastFiredEvt = 1; + if (cctx.affinity().backups(entry.partition(), entry.topologyVersion()).isEmpty() && + !aff.primary(entry.partition(), curTop).id().equals(aff.primary(entry.partition(), + entry.topologyVersion()).id())) { + entries = new ArrayList<>(pendingEvts.size()); - return F.asList(entry); + for (CacheContinuousQueryEntry evt : pendingEvts.values()) { + if (evt != HOLE && !evt.isFiltered()) + entries.add(evt); + } + + pendingEvts.clear(); + + curTop = entry.topologyVersion(); + + lastFiredEvt = entry.updateIndex(); + + entries.add(entry); + + return entries; + } + + curTop = entry.topologyVersion(); } // Check duplicate. - if (entry.updateIndex() > lastFiredEvt) - pendingEnts.put(entry.updateIndex(), entry); + if (entry.updateIndex() > lastFiredEvt) { + pendingEvts.put(entry.updateIndex(), entry); + + // Put filtered events. + if (entry.filteredEvents() != null) { + for (long idx : entry.filteredEvents()) { + if (idx > lastFiredEvt) + pendingEvts.put(idx, HOLE); + } + } + } else { if (log.isDebugEnabled()) log.debug("Skip duplicate continuous query message: " + entry); @@ -620,10 +708,10 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { return Collections.emptyList(); } - if (pendingEnts.isEmpty()) + if (pendingEvts.isEmpty()) return Collections.emptyList(); - Iterator<Map.Entry<Long, CacheContinuousQueryEntry>> iter = pendingEnts.entrySet().iterator(); + Iterator<Map.Entry<Long, CacheContinuousQueryEntry>> iter = pendingEvts.entrySet().iterator(); entries = new ArrayList<>(); @@ -634,10 +722,13 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { if (e.getKey() == lastFiredEvt + 1) { ++lastFiredEvt; - entries.add(e.getValue()); + if (e.getValue() != HOLE && !e.getValue().isFiltered()) + entries.add(e.getValue()); iter.remove(); } + else + break; } } @@ -645,6 +736,73 @@ class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler { } } + /** + * + */ + private static class HoleBuffer { + /** */ + private final TreeSet<Long> buf = new TreeSet<>(); + + /** */ + private long lastFiredEvt; + + /** + * Add continuous entry. + * + * @param e Cache continuous query entry. + * @return Collection entries which will be fired. + */ + public CacheContinuousQueryEntry handle(CacheContinuousQueryEntry e) { + assert e != null; + + synchronized (buf) { + // Handle filtered events. + if (e.isFiltered()) { + if (lastFiredEvt > e.updateIndex() || e.updateIndex() == 1) + return e; + + buf.add(e.updateIndex()); + + return null; + } + else { + if (lastFiredEvt < e.updateIndex()) + lastFiredEvt = e.updateIndex(); + + // Doesn't have filtered and delayed events. + if (buf.isEmpty() || buf.first() > e.updateIndex()) + return e; + else { + GridLongList filteredEvts = new GridLongList(buf.size()); + int size = 0; + + Iterator<Long> iter = buf.iterator(); + + while (iter.hasNext()) { + long idx = iter.next(); + + if (idx < e.updateIndex()) { + filteredEvts.add(idx); + + iter.remove(); + + ++size; + } + else + break; + } + + filteredEvts.truncate(size, true); + + e.filteredEvents(filteredEvts); + + return e; + } + } + } + } + } + /** {@inheritDoc} */ @Override public void p2pMarshal(GridKernalContext ctx) throws IgniteCheckedException { assert ctx != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/6c59f761/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 14fe195..bdd009a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -256,7 +256,6 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { throws IgniteCheckedException { assert e != null; assert key != null; - assert Thread.holdsLock(e) : e; if (e.isInternal()) return; http://git-wip-us.apache.org/repos/asf/ignite/blob/6c59f761/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java index f5cf501..7d47b3b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxEntry.java @@ -182,6 +182,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { private byte flags; /** Partition update index. */ + @GridDirectTransient private long partIdx; /** */ @@ -953,11 +954,6 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { writer.incrementState(); - case 12: - if (!writer.writeLong("partIdx", partIdx)) - return false; - - writer.incrementState(); } return true; @@ -1067,14 +1063,6 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { reader.incrementState(); - case 12: - partIdx = reader.readLong("partIdx"); - - if (!reader.isLastRead()) - return false; - - reader.incrementState(); - } return reader.afterMessageRead(IgniteTxEntry.class); @@ -1087,7 +1075,7 @@ public class IgniteTxEntry implements GridPeerDeployAware, Message { /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 13; + return 12; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/6c59f761/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java index 40fb12a..648ed7b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousHandler.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.continuous; import java.io.Externalizable; import java.util.Collection; +import java.util.Map; import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.GridKernalContext; @@ -145,4 +146,9 @@ public interface GridContinuousHandler extends Externalizable, Cloneable { * @return Cache name if this is a continuous query handler. */ public String cacheName(); + + /** + * @param idx Init state for partition indexies. + */ + public void updateIdx(Map<Integer, Long> idx); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/6c59f761/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 3ed186e..c63a82f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -205,8 +205,14 @@ public class GridContinuousProcessor extends GridProcessorAdapter { StartFuture fut = startFuts.remove(msg.routineId()); if (fut != null) { - if (msg.errs().isEmpty()) + if (msg.errs().isEmpty()) { + LocalRoutineInfo routine = locInfos.get(msg.routineId()); + + if (routine != null) + routine.handler().updateIdx(msg.updateIdxs()); + fut.onRemoteRegistered(); + } else { IgniteCheckedException firstEx = F.first(msg.errs().values()); @@ -685,7 +691,7 @@ public class GridContinuousProcessor extends GridProcessorAdapter { */ public void addNotification(UUID nodeId, final UUID routineId, - Object obj, + @Nullable Object obj, @Nullable Object orderedTopic, boolean sync, boolean msg) @@ -856,6 +862,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } } + if (ctx.cache() != null && ctx.cache().internalCache(hnd.cacheName()) != null) { + Map<Integer, Long> idx = ctx.cache().internalCache(hnd.cacheName()).context().topology().updateCounters(); + + req.addUpdateIdxs(idx); + } + if (err != null) req.addError(ctx.localNodeId(), err); http://git-wip-us.apache.org/repos/asf/ignite/blob/6c59f761/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java index bd4aae3..0b5cfaf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineAckDiscoveryMessage.java @@ -35,14 +35,19 @@ public class StartRoutineAckDiscoveryMessage extends AbstractContinuousMessage { /** */ private final Map<UUID, IgniteCheckedException> errs; + /** */ + private final Map<Integer, Long> updateIdxs; + /** * @param routineId Routine id. * @param errs Errs. */ - public StartRoutineAckDiscoveryMessage(UUID routineId, Map<UUID, IgniteCheckedException> errs) { + public StartRoutineAckDiscoveryMessage(UUID routineId, Map<UUID, IgniteCheckedException> errs, + Map<Integer, Long> idx) { super(routineId); this.errs = new HashMap<>(errs); + this.updateIdxs = idx; } /** {@inheritDoc} */ @@ -50,6 +55,11 @@ public class StartRoutineAckDiscoveryMessage extends AbstractContinuousMessage { return null; } + /** {@inheritDoc} */ + public Map<Integer, Long> updateIdxs() { + return updateIdxs; + } + /** * @return Errs. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/6c59f761/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java index 892adac..cfacde4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/StartRoutineDiscoveryMessage.java @@ -37,6 +37,9 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage { /** */ private final Map<UUID, IgniteCheckedException> errs = new HashMap<>(); + /** */ + private final Map<Integer, Long> updateIdxes = new HashMap<>(); + /** * @param routineId Routine id. * @param startReqData Start request data. @@ -63,6 +66,19 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage { } /** + * @param idx Update indexes. + */ + public void addUpdateIdxs(Map<Integer, Long> idx) { + for (Map.Entry<Integer, Long> e : idx.entrySet()) { + Long cntr0 = updateIdxes.get(e.getKey()); + Long cntr1 = e.getValue(); + + if (cntr0 == null || cntr1 > cntr0) + updateIdxes.put(e.getKey(), cntr1); + } + } + + /** * @return Errs. */ public Map<UUID, IgniteCheckedException> errs() { @@ -76,7 +92,7 @@ public class StartRoutineDiscoveryMessage extends AbstractContinuousMessage { /** {@inheritDoc} */ @Override public DiscoveryCustomMessage ackMessage() { - return new StartRoutineAckDiscoveryMessage(routineId, errs); + return new StartRoutineAckDiscoveryMessage(routineId, errs, updateIdxes); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/6c59f761/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java index 90e21ad..0a95036 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java @@ -27,7 +27,11 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.*; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -86,10 +90,9 @@ import org.apache.ignite.transactions.Transaction; import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK; +import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.PRIMARY; import static org.apache.ignite.cache.CacheMode.REPLICATED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; /** * @@ -172,7 +175,45 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo * @return Write order mode for atomic cache. */ protected CacheAtomicWriteOrderMode writeOrderMode() { - return CLOCK; + return PRIMARY; + } + + /** + * @throws Exception If failed. + */ + public void testFirstFilteredEvent() throws Exception { + this.backups = 2; + + final int SRV_NODES = 4; + + startGridsMultiThreaded(SRV_NODES); + + client = true; + + Ignite qryClient = startGrid(SRV_NODES); + + client = false; + + IgniteCache<Object, Object> qryClnCache = qryClient.cache(null); + + final CacheEventListener3 lsnr = new CacheEventListener3(); + + ContinuousQuery<Object, Object> qry = new ContinuousQuery<>(); + + qry.setLocalListener(lsnr); + + qry.setRemoteFilter(new CacheEventFilter()); + + try (QueryCursor<?> cur = qryClnCache.query(qry)) { + List<Integer> keys = testKeys(grid(0).cache(null), 1); + + for (Integer key : keys) + qryClnCache.put(key, -1); + + qryClnCache.put(keys.get(0), 100); + } + + assertEquals(lsnr.evts.size(), 1); } /** @@ -1222,7 +1263,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo startGrid(idx); - Thread.sleep(3000); + Thread.sleep(200); log.info("Stop node: " + idx); @@ -1435,7 +1476,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo startGrid(idx); - Thread.sleep(3000); + Thread.sleep(200); log.info("Stop node: " + idx); @@ -1591,7 +1632,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo * @throws Exception If failed. */ public void testFailoverStartStopBackup() throws Exception { - failoverStartStopFilter(atomicityMode() == CacheAtomicityMode.ATOMIC ? 1 : 2); + failoverStartStopFilter(2); } /** @@ -1698,7 +1739,9 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo stopGrid(idx); - Thread.sleep(100); + awaitPartitionMapExchange(); + + Thread.sleep(200); log.info("Start node: " + idx); @@ -1706,6 +1749,8 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo CountDownLatch latch = new CountDownLatch(1); + awaitPartitionMapExchange(); + assertTrue(checkLatch.compareAndSet(null, latch)); if (!stop.get()) { @@ -1728,7 +1773,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo final List<T3<Object, Object, Object>> expEvtsLsnr = new ArrayList<>(); try { - long stopTime = System.currentTimeMillis() + 10_000; + long stopTime = System.currentTimeMillis() + 60_000; // Start new filter each 5 sec. long startFilterTime = System.currentTimeMillis() + 5_000; @@ -1752,7 +1797,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo if (dinQry != null) { dinQry.close(); - log.error("Continuous query listener closed."); + log.info("Continuous query listener closed."); checkEvents(expEvtsNewLsnr, dinLsnr, backups == 0); } @@ -1767,7 +1812,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo dinQry = qryClnCache.query(newQry); - log.error("Continuous query listener started."); + log.info("Continuous query listener started."); startFilterTime = System.currentTimeMillis() + 5_000; }
