ignite-426-2-reb WIP
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/926a0013 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/926a0013 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/926a0013 Branch: refs/heads/ignite-426-2-reb Commit: 926a0013724e1394fd02c601ab57c53aa7f217f8 Parents: b59d02d Author: Tikhonov Nikolay <[email protected]> Authored: Sun Oct 25 17:10:10 2015 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Wed Oct 28 15:26:45 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheMapEntry.java | 69 ++++++++++++++++---- .../cache/GridCacheUpdateAtomicResult.java | 17 ++++- .../dht/atomic/GridDhtAtomicCache.java | 24 ++++++- .../continuous/CacheContinuousQueryManager.java | 6 ++ ...acheContinuousQueryFailoverAbstractTest.java | 8 +-- 5 files changed, 106 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/926a0013/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index abed98d..e842f61 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -33,6 +33,7 @@ import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheAtomicWriteOrderMode; import org.apache.ignite.cache.CacheMemoryMode; import org.apache.ignite.cache.eviction.EvictableEntry; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -61,6 +62,7 @@ import org.apache.ignite.internal.util.lang.GridTuple3; import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.T3; @@ -1766,6 +1768,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme CacheObject oldVal; CacheObject updated; + if (!primary) { + int z = 0; + + ++z; + } + GridCacheVersion enqueueVer = null; GridCacheVersionConflictContext<?, ?> conflictCtx = null; @@ -1784,6 +1792,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme Object updated0 = null; Long updateIdx0 = null; + CI1<IgniteInternalFuture<Void>> contQryNtf = null; synchronized (this) { boolean needVal = intercept || retval || op == GridCacheOperation.TRANSFORM || !F.isEmptyOrNulls(filter); @@ -1893,7 +1902,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme null, null, false, - updateIdx0 == null ? 0 : updateIdx0); + updateIdx0 == null ? 0 : updateIdx0, + null); } // Will update something. else { @@ -1970,8 +1980,23 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (updateIdx != null) updateIdx0 = updateIdx; - cctx.continuousQueries().onEntryUpdated(this, key, evtVal, prevVal, primary, false, - updateIdx0, topVer); + final boolean primary0 = primary; + final CacheObject prevVal0 = prevVal; + final CacheObject evtVal0 = evtVal; + final AffinityTopologyVersion topVer0 = topVer; + final long updateIdx00 = updateIdx0; + + contQryNtf = new CI1<IgniteInternalFuture<Void>>() { + @Override public void apply(IgniteInternalFuture<Void> voidIgniteInternalFuture) { + try { + cctx.continuousQueries().onEntryUpdated(GridCacheMapEntry.this, key, evtVal0, + prevVal0, primary0, false, updateIdx00, topVer0); + } + catch (IgniteCheckedException e) { + // No-op. + } + } + }; } return new GridCacheUpdateAtomicResult(false, @@ -1983,7 +2008,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme null, null, false, - updateIdx0 == null ? 0 : updateIdx0); + updateIdx0 == null ? 0 : updateIdx0, + contQryNtf); } } else @@ -2060,7 +2086,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme null, null, false, - updateIdx0 == null ? 0 : updateIdx0); + updateIdx0 == null ? 0 : updateIdx0, + null); } } @@ -2108,7 +2135,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme null, null, false, - updateIdx0 == null ? 0 : updateIdx); + updateIdx0 == null ? 0 : updateIdx, + null); } } else @@ -2209,7 +2237,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme null, null, false, - updateIdx0 == null ? 0 : updateIdx0); + updateIdx0 == null ? 0 : updateIdx0, + null); else if (interceptorVal != updated0) { updated0 = cctx.unwrapTemporary(interceptorVal); @@ -2291,7 +2320,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme null, null, false, - updateIdx0 == null ? 0 : updateIdx0); + updateIdx0 == null ? 0 : updateIdx0, + null); } if (writeThrough) @@ -2377,8 +2407,24 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (res) updateMetrics(op, metrics); - if (!isNear()) - cctx.continuousQueries().onEntryUpdated(this, key, val, oldVal, primary, false, updateIdx0, topVer); + if (!isNear()) { + final boolean primary0 = primary; + final CacheObject oldVal0 = oldVal; + final AffinityTopologyVersion topVer0 = topVer; + final long updateIdx00 = updateIdx0; + + contQryNtf = new CI1<IgniteInternalFuture<Void>>() { + @Override public void apply(IgniteInternalFuture<Void> voidIgniteInternalFuture) { + try { + cctx.continuousQueries().onEntryUpdated(GridCacheMapEntry.this, key, val, oldVal0, primary0, + false, updateIdx00, topVer0); + } + catch (IgniteCheckedException e) { + // No-op. + } + } + }; + } cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE); @@ -2405,7 +2451,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme enqueueVer, conflictCtx, true, - updateIdx0); + updateIdx0, + contQryNtf); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/926a0013/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java index 092d990..9e2aca6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java @@ -18,9 +18,12 @@ package org.apache.ignite.internal.processors.cache; import javax.cache.processor.EntryProcessor; + +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteBiTuple; import org.jetbrains.annotations.Nullable; @@ -63,6 +66,9 @@ public class GridCacheUpdateAtomicResult { /** Value computed by entry processor. */ private IgniteBiTuple<Object, Exception> res; + /** Continuous query notify listener. */ + private CI1<IgniteInternalFuture<Void>> contQryNtfy; + /** * Constructor. * @@ -86,7 +92,8 @@ public class GridCacheUpdateAtomicResult { @Nullable GridCacheVersion rmvVer, @Nullable GridCacheVersionConflictContext<?, ?> conflictRes, boolean sndToDht, - long updateIdx) { + long updateIdx, + @Nullable CI1<IgniteInternalFuture<Void>> contQryNtfy) { this.success = success; this.oldVal = oldVal; this.newVal = newVal; @@ -97,6 +104,7 @@ public class GridCacheUpdateAtomicResult { this.conflictRes = conflictRes; this.sndToDht = sndToDht; this.updateIdx = updateIdx; + this.contQryNtfy = contQryNtfy; } /** @@ -170,6 +178,13 @@ public class GridCacheUpdateAtomicResult { return sndToDht; } + /** + * @return Continuous notify closure. + */ + public CI1<IgniteInternalFuture<Void>> contQryNtfy() { + return contQryNtfy; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridCacheUpdateAtomicResult.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/926a0013/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 46799d7..c6ab45d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -1765,7 +1765,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { filteredReaders = F.view(entry.readers(), F.notEqualTo(node.id())); } - GridCacheUpdateAtomicResult updRes = entry.innerUpdate( + final GridCacheUpdateAtomicResult updRes = entry.innerUpdate( ver, node.id(), locNodeId, @@ -1799,6 +1799,25 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { readersOnly = true; } + if (!primary) { + int z = 0; + + ++z; + } + + if (updRes.contQryNtfy() != null) { + if (primary && dhtFut != null) { + dhtFut.listen(new CI1<IgniteInternalFuture<Void>>() { + @Override public void apply(IgniteInternalFuture<Void> f) { + if (f.isDone() && f.error() == null) + updRes.contQryNtfy().apply(f); + } + }); + } + else + updRes.contQryNtfy().apply(null); + } + if (dhtFut != null) { if (updRes.sendToDht()) { // Send to backups even in case of remove-remove scenarios. GridCacheVersionConflictContext<?, ?> conflictCtx = updRes.conflictResolveResult(); @@ -2561,6 +2580,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (updRes.removeVersion() != null) ctx.onDeferredDelete(entry, updRes.removeVersion()); + if (updRes.contQryNtfy() != null) + updRes.contQryNtfy().apply(null); + entry.onUnlock(); break; // While. http://git-wip-us.apache.org/repos/asf/ignite/blob/926a0013/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 14fe195..ecc778b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -189,6 +189,12 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { if (preload && !internal) return; + if (!primary) { + int z = 0; + + ++z; + } + ConcurrentMap<UUID, CacheContinuousQueryListener> lsnrCol; if (internal) http://git-wip-us.apache.org/repos/asf/ignite/blob/926a0013/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java index 6979f6a..90e21ad 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFailoverAbstractTest.java @@ -27,11 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -93,6 +89,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK; import static org.apache.ignite.cache.CacheMode.REPLICATED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; /** * @@ -122,6 +119,7 @@ public abstract class CacheContinuousQueryFailoverAbstractTest extends GridCommo TestCommunicationSpi commSpi = new TestCommunicationSpi(); + commSpi.setSharedMemoryPort(-1); commSpi.setIdleConnectionTimeout(100); cfg.setCommunicationSpi(commSpi);
