ignite-2587 Fixed continuous query notifications in offheap mode and BinaryObjectOffheapImpl usage.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/4c05fc02 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/4c05fc02 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/4c05fc02 Branch: refs/heads/ignite-961 Commit: 4c05fc0254f446ef040f5d22a066a0d4916a589e Parents: 0b47d5c Author: sboikov <[email protected]> Authored: Wed Feb 10 14:07:40 2016 +0300 Committer: sboikov <[email protected]> Committed: Wed Feb 10 14:07:40 2016 +0300 ---------------------------------------------------------------------- .../processors/cache/CacheLazyEntry.java | 3 + .../processors/cache/GridCacheContext.java | 4 +- .../processors/cache/GridCacheMapEntry.java | 118 +++- .../binary/CacheObjectBinaryProcessorImpl.java | 6 +- .../dht/atomic/GridDhtAtomicCache.java | 79 ++- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 85 ++- .../dht/atomic/GridDhtAtomicUpdateRequest.java | 38 +- .../cache/query/GridCacheQueryManager.java | 30 +- .../continuous/CacheContinuousQueryHandler.java | 3 +- .../CacheContinuousQueryListener.java | 2 +- .../continuous/CacheContinuousQueryManager.java | 120 +++- .../continuous/GridContinuousProcessor.java | 16 +- .../IgniteCacheEntryListenerAbstractTest.java | 454 ++++++++---- ...cheEntryListenerAtomicOffheapTieredTest.java | 32 + ...cheEntryListenerAtomicOffheapValuesTest.java | 32 + ...teCacheEntryListenerTxOffheapTieredTest.java | 32 + ...teCacheEntryListenerTxOffheapValuesTest.java | 32 + .../cache/IgniteCacheEntryListenerTxTest.java | 1 + ...ContinuousQueryFailoverAbstractSelfTest.java | 10 + ...tomicPrimaryWriteOrderOffheapTieredTest.java | 33 + ...tinuousQueryFailoverTxOffheapTieredTest.java | 32 + ...acheContinuousQueryRandomOperationsTest.java | 684 +++++++++++++++++++ ...ridCacheContinuousQueryAbstractSelfTest.java | 19 +- ...eContinuousQueryAtomicOffheapTieredTest.java | 32 + ...eContinuousQueryAtomicOffheapValuesTest.java | 32 + ...CacheContinuousQueryTxOffheapTieredTest.java | 32 + ...CacheContinuousQueryTxOffheapValuesTest.java | 32 + .../junits/common/GridCommonAbstractTest.java | 2 +- .../ignite/testsuites/IgniteCacheTestSuite.java | 8 + .../IgniteCacheQuerySelfTestSuite.java | 14 + 30 files changed, 1743 insertions(+), 274 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java index 05a6fef..30933e5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheLazyEntry.java @@ -50,6 +50,7 @@ public class CacheLazyEntry<K, V> implements Cache.Entry<K, V> { * @param cctx Cache context. * @param keyObj Key cache object. * @param valObj Cache object value. + * @param keepBinary Keep binary flag. */ public CacheLazyEntry(GridCacheContext cctx, KeyCacheObject keyObj, CacheObject valObj, boolean keepBinary) { this.cctx = cctx; @@ -61,6 +62,7 @@ public class CacheLazyEntry<K, V> implements Cache.Entry<K, V> { /** * @param keyObj Key cache object. * @param val Value. + * @param keepBinary Keep binary flag. * @param cctx Cache context. */ public CacheLazyEntry(GridCacheContext cctx, KeyCacheObject keyObj, V val, boolean keepBinary) { @@ -75,6 +77,7 @@ public class CacheLazyEntry<K, V> implements Cache.Entry<K, V> { * @param keyObj Key cache object. * @param key Key value. * @param valObj Cache object + * @param keepBinary Keep binary flag. * @param val Cache value. */ public CacheLazyEntry(GridCacheContext<K, V> ctx, http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index e875df0..5729959 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -1729,10 +1729,10 @@ public class GridCacheContext<K, V> implements Externalizable { * @return Heap-based object. */ @Nullable public <T> T unwrapTemporary(@Nullable Object obj) { - if (!offheapTiered()) + if (!useOffheapEntry()) return (T)obj; - return (T) cacheObjects().unwrapTemporary(this, obj); + return (T)cacheObjects().unwrapTemporary(this, obj); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index ae40295..9336e0a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.cache; import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; +import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import javax.cache.Cache; @@ -44,6 +45,7 @@ import org.apache.ignite.internal.processors.cache.extras.GridCacheMvccEntryExtr import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras; import org.apache.ignite.internal.processors.cache.extras.GridCacheTtlEntryExtras; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener; import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; @@ -1122,7 +1124,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme assert newVer != null : "Failed to get write version for tx: " + tx; - old = (retval || intercept) ? rawGetOrUnmarshalUnlocked(!retval) : this.val; + boolean internal = isInternal() || !context().userCache(); + + Map<UUID, CacheContinuousQueryListener> lsnrCol = + notifyContinuousQueries(tx) ? cctx.continuousQueries().updateListeners(internal, false) : null; + + old = (retval || intercept || lsnrCol != null) ? + rawGetOrUnmarshalUnlocked(!retval && !isOffHeapValuesOnly()) : this.val; if (intercept) { val0 = CU.value(val, cctx, false); @@ -1206,10 +1214,19 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme keepBinary); } - if (cctx.isLocal() || cctx.isReplicated() || - (!isNear() && !(tx != null && tx.onePhaseCommit() && !tx.local()))) - cctx.continuousQueries().onEntryUpdated(key, val, old, isInternal() || !context().userCache(), - partition(), tx.local(), false, updateCntr0, topVer); + if (lsnrCol != null) { + cctx.continuousQueries().onEntryUpdated( + lsnrCol, + key, + val, + old, + internal, + partition(), + tx.local(), + false, + updateCntr0, + topVer); + } cctx.dataStructures().onEntryUpdated(key, false, keepBinary); } @@ -1304,7 +1321,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme newVer = explicitVer != null ? explicitVer : tx == null ? nextVersion() : tx.writeVersion(); - old = (retval || intercept) ? rawGetOrUnmarshalUnlocked(!retval) : val; + boolean internal = isInternal() || !context().userCache(); + + Map<UUID, CacheContinuousQueryListener> lsnrCol = + notifyContinuousQueries(tx) ? cctx.continuousQueries().updateListeners(internal, false) : null; + + old = (retval || intercept || lsnrCol != null) ? + rawGetOrUnmarshalUnlocked(!retval && !isOffHeapValuesOnly()) : val; if (intercept) { entry0 = new CacheLazyEntry(cctx, key, old, keepBinary); @@ -1388,10 +1411,19 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme keepBinary); } - if (cctx.isLocal() || cctx.isReplicated() || - (!isNear() && !(tx != null && tx.onePhaseCommit() && !tx.local()))) - cctx.continuousQueries().onEntryUpdated(key, null, old, isInternal() - || !context().userCache(),partition(), tx.local(), false, updateCntr0, topVer); + if (lsnrCol != null) { + cctx.continuousQueries().onEntryUpdated( + lsnrCol, + key, + null, + old, + internal, + partition(), + tx.local(), + false, + updateCntr0, + topVer); + } cctx.dataStructures().onEntryUpdated(key, true, keepBinary); @@ -1440,6 +1472,16 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme return new GridCacheUpdateTxResult(false, null); } + /** + * @param tx Transaction. + * @return {@code True} if should notify continuous query manager. + */ + private boolean notifyContinuousQueries(@Nullable IgniteInternalTx tx) { + return cctx.isLocal() || + cctx.isReplicated() || + (!isNear() && !(tx != null && tx.onePhaseCommit() && !tx.local())); + } + /** {@inheritDoc} */ @SuppressWarnings("unchecked") @Override public GridTuple3<Boolean, Object, EntryProcessorResult<Object>> innerUpdateLocal( @@ -1470,7 +1512,16 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme EntryProcessorResult<Object> invokeRes = null; synchronized (this) { - boolean needVal = retval || intercept || op == GridCacheOperation.TRANSFORM || !F.isEmpty(filter); + boolean internal = isInternal() || !context().userCache(); + + Map<UUID, CacheContinuousQueryListener> lsnrCol = + cctx.continuousQueries().updateListeners(internal, false); + + boolean needVal = retval || + intercept || + op == GridCacheOperation.TRANSFORM || + !F.isEmpty(filter) || + lsnrCol != null; checkObsolete(); @@ -1479,7 +1530,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme unswap(retval); // Possibly get old value form store. - old = needVal ? rawGetOrUnmarshalUnlocked(!retval) : val; + old = needVal ? rawGetOrUnmarshalUnlocked(!retval && !isOffHeapValuesOnly()) : val; boolean readFromStore = false; @@ -1731,11 +1782,20 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (res) updateMetrics(op, metrics); - if (!isNear()) { + if (lsnrCol != null) { long updateCntr = nextPartCounter(AffinityTopologyVersion.NONE); - cctx.continuousQueries().onEntryUpdated(key, val, old, isInternal() || !context().userCache(), - partition(), true, false, updateCntr, AffinityTopologyVersion.NONE); + cctx.continuousQueries().onEntryUpdated( + lsnrCol, + key, + val, + old, + internal, + partition(), + true, + false, + updateCntr, + AffinityTopologyVersion.NONE); } cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE, keepBinary); @@ -1997,8 +2057,16 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (updateCntr != null) updateCntr0 = updateCntr; - cctx.continuousQueries().onEntryUpdated(key, evtVal, prevVal, isInternal() - || !context().userCache(), partition(), primary, false, updateCntr0, topVer); + cctx.continuousQueries().onEntryUpdated( + key, + evtVal, + prevVal, + isInternal() || !context().userCache(), + partition(), + primary, + false, + updateCntr0, + topVer); } return new GridCacheUpdateAtomicResult(false, @@ -2019,7 +2087,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } // Prepare old value and value bytes. - oldVal = needVal ? rawGetOrUnmarshalUnlocked(!retval) : val; + oldVal = needVal ? rawGetOrUnmarshalUnlocked(!retval && !isOffHeapValuesOnly()) : val; // Possibly read value from store. boolean readFromStore = false; @@ -2937,7 +3005,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme /** * @return {@code True} if values should be stored off-heap. */ - protected boolean isOffHeapValuesOnly() { + protected final boolean isOffHeapValuesOnly() { return cctx.config().getMemoryMode() == CacheMemoryMode.OFFHEAP_VALUES; } @@ -3236,8 +3304,16 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme drReplicate(drType, val, ver); if (!skipQryNtf) { - cctx.continuousQueries().onEntryUpdated(key, val, null, this.isInternal() - || !this.context().userCache(), this.partition(), true, preload, updateCntr, topVer); + cctx.continuousQueries().onEntryUpdated( + key, + val, + null, + this.isInternal() || !this.context().userCache(), + this.partition(), + true, + preload, + updateCntr, + topVer); cctx.dataStructures().onEntryUpdated(key, false, true); } http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java index 0fef6f8..f091fc7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java @@ -822,10 +822,10 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm Object val = unmarshal(valPtr, !tmp); - if (val instanceof BinaryObjectOffheapImpl) - return (BinaryObjectOffheapImpl)val; + if (val instanceof CacheObject) + return (CacheObject)val; - return new CacheObjectImpl(val, null); + return toCacheObject(ctx.cacheObjectContext(), val, false); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 6c7bac5..fec61df 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -76,6 +76,7 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSing import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; @@ -1992,6 +1993,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { boolean intercept = ctx.config().getInterceptor() != null; + boolean initLsnrs = false; + Map<UUID, CacheContinuousQueryListener> lsnrs = null; + boolean internal = false; + // Avoid iterator creation. for (int i = 0; i < keys.size(); i++) { KeyCacheObject k = keys.get(i); @@ -2006,6 +2011,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (entry == null) continue; + if (!initLsnrs) { + internal = entry.isInternal() || !context().userCache(); + + lsnrs = ctx.continuousQueries().updateListeners(internal, false); + + initLsnrs = true; + } + GridCacheVersion newConflictVer = req.conflictVersion(i); long newConflictTtl = req.conflictTtl(i); long newConflictExpireTime = req.conflictExpireTime(i); @@ -2034,7 +2047,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { req.invokeArguments(), primary && writeThrough() && !req.skipStore(), !req.skipStore(), - sndPrevVal || req.returnValue(), + lsnrs != null || sndPrevVal || req.returnValue(), req.keepBinary(), expiry, true, @@ -2061,6 +2074,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } if (dhtFut != null) { + dhtFut.listeners(lsnrs); + if (updRes.sendToDht()) { // Send to backups even in case of remove-remove scenarios. GridCacheVersionConflictContext<?, ?> conflictCtx = updRes.conflictResolveResult(); @@ -2097,10 +2112,18 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { "[entry=" + entry + ", filter=" + Arrays.toString(req.filter()) + ']'); } } - else if (!entry.isNear() && updRes.success()) { - ctx.continuousQueries().onEntryUpdated(entry.key(), updRes.newValue(), updRes.oldValue(), - entry.isInternal() || !context().userCache(), entry.partition(), primary, false, - updRes.updateCounter(), topVer); + else if (lsnrs != null && updRes.success()) { + ctx.continuousQueries().onEntryUpdated( + lsnrs, + entry.key(), + updRes.newValue(), + updRes.oldValue(), + internal, + entry.partition(), + primary, + false, + updRes.updateCounter(), + topVer); } if (hasNear) { @@ -2275,6 +2298,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { boolean intercept = ctx.config().getInterceptor() != null; + boolean initLsnrs = false; + Map<UUID, CacheContinuousQueryListener> lsnrs = null; + // Avoid iterator creation. for (int i = 0; i < entries.size(); i++) { GridDhtCacheEntry entry = entries.get(i); @@ -2308,6 +2334,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { filteredReaders = F.view(entry.readers(), F.notEqualTo(node.id())); } + if (!initLsnrs) { + lsnrs = ctx.continuousQueries().updateListeners( + entry.isInternal() || !context().userCache(), + false); + + initLsnrs = true; + } + GridCacheUpdateAtomicResult updRes = entry.innerUpdate( ver, node.id(), @@ -2317,7 +2351,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, /*write-through*/false, /*read-through*/false, - /*retval*/sndPrevVal, + /*retval*/sndPrevVal || lsnrs != null, req.keepBinary(), expiry, /*event*/true, @@ -2366,6 +2400,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } if (dhtFut != null) { + dhtFut.listeners(lsnrs); + EntryProcessor<Object, Object, Object> entryProcessor = entryProcessorMap == null ? null : entryProcessorMap.get(entry.key()); @@ -2763,6 +2799,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash()); + boolean initLsnrs = false; + Map<UUID, CacheContinuousQueryListener> lsnrs = null; + boolean internal = false; + for (int i = 0; i < req.size(); i++) { KeyCacheObject key = req.key(i); @@ -2785,6 +2825,14 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { long ttl = req.ttl(i); long expireTime = req.conflictExpireTime(i); + if (!initLsnrs) { + internal = entry.isInternal() || !context().userCache(); + + lsnrs = ctx.continuousQueries().updateListeners(internal, false); + + initLsnrs = true; + } + GridCacheUpdateAtomicResult updRes = entry.innerUpdate( ver, nodeId, @@ -2794,7 +2842,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { op == TRANSFORM ? req.invokeArguments() : null, /*write-through*/false, /*read-through*/false, - /*retval*/false, + /*retval*/lsnrs != null, req.keepBinary(), /*expiry policy*/null, /*event*/true, @@ -2817,10 +2865,19 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (updRes.removeVersion() != null) ctx.onDeferredDelete(entry, updRes.removeVersion()); - if (updRes.success() && !entry.isNear()) - ctx.continuousQueries().onEntryUpdated(entry.key(), updRes.newValue(), - updRes.oldValue(), entry.isInternal() || !context().userCache(), entry.partition(), - false, false, updRes.updateCounter(), req.topologyVersion()); + if (lsnrs != null && updRes.success()) { + ctx.continuousQueries().onEntryUpdated( + lsnrs, + entry.key(), + updRes.newValue(), + updRes.oldValue(), + internal, + entry.partition(), + false, + false, + updRes.updateCounter(), + req.topologyVersion()); + } entry.onUnlock(); http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 06c8441..58d704d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -37,6 +37,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryListener; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -102,6 +103,9 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> /** Response count. */ private volatile int resCnt; + /** */ + private Map<UUID, CacheContinuousQueryListener> lsnrs; + /** * @param cctx Cache context. * @param completionCb Callback to invoke when future is completed. @@ -136,6 +140,13 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> waitForExchange = !topLocked; } + /** + * @param lsnrs Continuous query listeners. + */ + void listeners(@Nullable Map<UUID, CacheContinuousQueryListener> lsnrs) { + this.lsnrs = lsnrs; + } + /** {@inheritDoc} */ @Override public IgniteUuid futureId() { return futVer.asGridUuid(); @@ -215,6 +226,8 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> * @param ttl TTL (optional). * @param conflictExpireTime Conflict expire time (optional). * @param conflictVer Conflict version (optional). + * @param addPrevVal If {@code true} sends previous value to backups. + * @param prevVal Previous value. * @param updateCntr Partition update counter. */ public void addWriteEntry(GridDhtCacheEntry entry, @@ -270,13 +283,22 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> addPrevVal, entry.partition(), prevVal, - updateCntr); + updateCntr, + lsnrs != null); } - else if (dhtNodes.size() == 1) { + else if (lsnrs != null && dhtNodes.size() == 1) { try { - cctx.continuousQueries().onEntryUpdated(entry.key(), val, prevVal, - entry.key().internal() || !cctx.userCache(), entry.partition(), true, false, - updateCntr, updateReq.topologyVersion()); + cctx.continuousQueries().onEntryUpdated( + lsnrs, + entry.key(), + val, + prevVal, + entry.key().internal() || !cctx.userCache(), + entry.partition(), + true, + false, + updateCntr, + updateReq.topologyVersion()); } catch (IgniteCheckedException e) { U.warn(log, "Failed to send continuous query message. [key=" + entry.key() + ", newVal=" @@ -352,7 +374,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> cctx.mvcc().removeAtomicFuture(version()); if (err != null) { - if (!mappings.isEmpty()) { + if (!mappings.isEmpty() && lsnrs != null) { Collection<KeyCacheObject> hndKeys = new ArrayList<>(keys.size()); exit: for (GridDhtAtomicUpdateRequest req : mappings.values()) { @@ -362,7 +384,11 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> if (!hndKeys.contains(key)) { updateRes.addFailedKey(key, err); - cctx.continuousQueries().skipUpdateEvent(key, req.partitionId(i), req.updateCounter(i), + cctx.continuousQueries().skipUpdateEvent( + lsnrs, + key, + req.partitionId(i), + req.updateCounter(i), updateReq.topologyVersion()); hndKeys.add(key); @@ -378,27 +404,38 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> updateRes.addFailedKey(key, err); } else { - Collection<KeyCacheObject> hndKeys = new ArrayList<>(keys.size()); + if (lsnrs != null) { + Collection<KeyCacheObject> hndKeys = new ArrayList<>(keys.size()); - exit: for (GridDhtAtomicUpdateRequest req : mappings.values()) { - for (int i = 0; i < req.size(); i++) { - KeyCacheObject key = req.key(i); + exit: for (GridDhtAtomicUpdateRequest req : mappings.values()) { + for (int i = 0; i < req.size(); i++) { + KeyCacheObject key = req.key(i); - if (!hndKeys.contains(key)) { - try { - cctx.continuousQueries().onEntryUpdated(key, req.value(i), req.localPreviousValue(i), - key.internal() || !cctx.userCache(), req.partitionId(i), true, false, - req.updateCounter(i), updateReq.topologyVersion()); - } - catch (IgniteCheckedException e) { - U.warn(log, "Failed to send continuous query message. [key=" + key + ", newVal=" - + req.value(i) + ", err=" + e + "]"); - } + if (!hndKeys.contains(key)) { + try { + cctx.continuousQueries().onEntryUpdated( + lsnrs, + key, + req.value(i), + req.localPreviousValue(i), + key.internal() || !cctx.userCache(), + req.partitionId(i), + true, + false, + req.updateCounter(i), + updateReq.topologyVersion()); + } + catch (IgniteCheckedException e) { + U.warn(log, "Failed to send continuous query message. [key=" + key + + ", newVal=" + req.value(i) + + ", err=" + e + "]"); + } - hndKeys.add(key); + hndKeys.add(key); - if (hndKeys.size() == keys.size()) - break exit; + if (hndKeys.size() == keys.size()) + break exit; + } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index 7cc276f..e417cdb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -28,6 +28,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.internal.GridDirectCollection; import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.IgniteCodeGeneratingFail; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; @@ -49,6 +50,7 @@ import org.jetbrains.annotations.Nullable; /** * Lite dht cache backup update request. */ +@IgniteCodeGeneratingFail // Need add 'cleanup' call in 'writeTo' method. public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements GridCacheDeployable { /** */ private static final long serialVersionUID = 0L; @@ -215,7 +217,6 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid keys = new ArrayList<>(); partIds = new ArrayList<>(); - locPrevVals = new ArrayList<>(); if (forceTransformBackups) { entryProcessors = new ArrayList<>(); @@ -240,7 +241,10 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid * @param conflictExpireTime Conflict expire time (optional). * @param conflictVer Conflict version (optional). * @param addPrevVal If {@code true} adds previous value. + * @param partId Partition. * @param prevVal Previous value. + * @param updateCntr Update counter. + * @param storeLocPrevVal If {@code true} stores previous value. */ public void addWriteValue(KeyCacheObject key, @Nullable CacheObject val, @@ -251,12 +255,18 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid boolean addPrevVal, int partId, @Nullable CacheObject prevVal, - @Nullable Long updateIdx) { + @Nullable Long updateCntr, + boolean storeLocPrevVal) { keys.add(key); partIds.add(partId); - locPrevVals.add(prevVal); + if (storeLocPrevVal) { + if (locPrevVals == null) + locPrevVals = new ArrayList<>(); + + locPrevVals.add(prevVal); + } if (forceTransformBackups) { assert entryProcessor != null; @@ -273,11 +283,11 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid prevVals.add(prevVal); } - if (updateIdx != null) { + if (updateCntr != null) { if (updateCntrs == null) updateCntrs = new GridLongList(); - updateCntrs.add(updateIdx); + updateCntrs.add(updateCntr); } // In case there is no conflict, do not create the list. @@ -521,6 +531,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid * @return Value. */ @Nullable public CacheObject localPreviousValue(int idx) { + assert locPrevVals != null; + return locPrevVals.get(idx); } @@ -849,6 +861,8 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid } + cleanup(); + return true; } @@ -1048,6 +1062,20 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid return reader.afterMessageRead(GridDhtAtomicUpdateRequest.class); } + /** + * Cleanup values not needed after message was sent. + */ + private void cleanup() { + nearVals = null; + prevVals = null; + + // Do not keep values if they are not needed for continuous query notification. + if (locPrevVals == null) { + vals = null; + locPrevVals = null; + } + } + /** {@inheritDoc} */ @Override public byte directType() { return 38; http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java index 8f0cab7..0d8f795 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java @@ -1107,7 +1107,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte next = null; while (it.hasNext()) { - final LazySwapEntry e = new LazySwapEntry(it.next(), keepBinary); + final LazySwapEntry e = new LazySwapEntry(it.next()); if (filter != null) { K key = (K)cctx.unwrapBinaryIfNeeded(e.key(), keepBinary); @@ -2524,15 +2524,11 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte /** */ private final Map.Entry<byte[], byte[]> e; - /** */ - private boolean keepBinary; - /** * @param e Entry with */ - LazySwapEntry(Map.Entry<byte[], byte[]> e, boolean keepBinary) { + LazySwapEntry(Map.Entry<byte[], byte[]> e) { this.e = e; - this.keepBinary = keepBinary; } /** {@inheritDoc} */ @@ -2545,9 +2541,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte @Override protected V unmarshalValue() throws IgniteCheckedException { IgniteBiTuple<byte[], Byte> t = GridCacheSwapEntryImpl.getValue(e.getValue()); - CacheObject obj = cctx.cacheObjects().toCacheObject(cctx.cacheObjectContext(), t.get2(), t.get1()); - - return (V)cctx.cacheObjectContext().unwrapBinaryIfNeeded(obj, keepBinary); + return (V)cctx.cacheObjects().toCacheObject(cctx.cacheObjectContext(), t.get2(), t.get1()); } /** {@inheritDoc} */ @@ -2597,13 +2591,7 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte @Override protected V unmarshalValue() throws IgniteCheckedException { long ptr = GridCacheOffheapSwapEntry.valueAddress(valPtr.get1(), valPtr.get2()); - CacheObject obj = cctx.fromOffheap(ptr, false); - - V val = CU.value(obj, cctx, false); - - assert val != null; - - return val; + return (V)cctx.fromOffheap(ptr, false); } /** {@inheritDoc} */ @@ -2661,7 +2649,15 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte if (!filter.apply(key, val)) return null; - return new IgniteBiTuple<>(e.key(), (V)cctx.unwrapTemporary(e.value())); + if (key instanceof CacheObject) + ((CacheObject)key).prepareMarshal(cctx.cacheObjectContext()); + + val = (V)cctx.unwrapTemporary(e.value()); + + if (val instanceof CacheObject) + ((CacheObject)val).prepareMarshal(cctx.cacheObjectContext()); + + return new IgniteBiTuple<>(e.key(), val); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java index 7e66ad3..cf9b439 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java @@ -882,8 +882,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler * @return Continuous query entry. */ private CacheContinuousQueryEntry skipEntry(CacheContinuousQueryEntry e) { - if (lastFiredCntr.get() > e.updateCounter() || e.updateCounter() == 1) { - + if (lastFiredCntr.get() > e.updateCounter() || e.updateCounter() == 1L) { e.markFiltered(); return e; http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java index 86abbef..dce04de 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java @@ -24,7 +24,7 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; /** * Continuous query listener. */ -interface CacheContinuousQueryListener<K, V> { +public interface CacheContinuousQueryListener<K, V> { /** * Query execution callback. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java index 0e4cb40..cc59989 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java @@ -60,6 +60,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.plugin.security.SecurityPermission; import org.apache.ignite.resources.LoggerResource; +import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentHashMap8; import static javax.cache.event.EventType.CREATED; @@ -155,37 +156,102 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { } /** + * @param lsnrs Listeners to notify. + * @param key Entry key. * @param partId Partition id. * @param updCntr Updated counter. * @param topVer Topology version. */ - public void skipUpdateEvent(KeyCacheObject key, int partId, long updCntr, AffinityTopologyVersion topVer) { - if (lsnrCnt.get() > 0) { - for (CacheContinuousQueryListener lsnr : lsnrs.values()) { - CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry( - cctx.cacheId(), - UPDATED, - key, - null, - null, - lsnr.keepBinary(), - partId, - updCntr, - topVer); + public void skipUpdateEvent(Map<UUID, CacheContinuousQueryListener> lsnrs, + KeyCacheObject key, int partId, long updCntr, AffinityTopologyVersion topVer) { + assert lsnrs != null; - CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>( - cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0); + for (CacheContinuousQueryListener lsnr : lsnrs.values()) { + CacheContinuousQueryEntry e0 = new CacheContinuousQueryEntry( + cctx.cacheId(), + UPDATED, + key, + null, + null, + lsnr.keepBinary(), + partId, + updCntr, + topVer); - lsnr.skipUpdateEvent(evt, topVer); - } + CacheContinuousQueryEvent evt = new CacheContinuousQueryEvent<>( + cctx.kernalContext().cache().jcache(cctx.name()), cctx, e0); + + lsnr.skipUpdateEvent(evt, topVer); } } /** + * @param internal Internal entry flag (internal key or not user cache). + * @param preload Whether update happened during preloading. + * @return Registered listeners. + */ + @Nullable public Map<UUID, CacheContinuousQueryListener> updateListeners( + boolean internal, + boolean preload) { + if (preload && !internal) + return null; + + ConcurrentMap<UUID, CacheContinuousQueryListener> lsnrCol; + + if (internal) + lsnrCol = intLsnrCnt.get() > 0 ? intLsnrs : null; + else + lsnrCol = lsnrCnt.get() > 0 ? lsnrs : null; + + return F.isEmpty(lsnrCol) ? null : lsnrCol; + } + + /** + * @param key Key. + * @param newVal New value. + * @param oldVal Old value. + * @param internal Internal entry (internal key or not user cache). + * @param partId Partition. + * @param primary {@code True} if called on primary node. + * @param preload Whether update happened during preloading. + * @param updateCntr Update counter. + * @param topVer Topology version. + * @throws IgniteCheckedException In case of error. + */ + public void onEntryUpdated( + KeyCacheObject key, + CacheObject newVal, + CacheObject oldVal, + boolean internal, + int partId, + boolean primary, + boolean preload, + long updateCntr, + AffinityTopologyVersion topVer) throws IgniteCheckedException { + Map<UUID, CacheContinuousQueryListener> lsnrCol = updateListeners(internal, preload); + + if (lsnrCol != null) { + onEntryUpdated( + lsnrCol, + key, + newVal, + oldVal, + internal, + partId, + primary, + preload, + updateCntr, + topVer); + } + } + + /** + * @param lsnrCol Listeners to notify. * @param key Key. * @param newVal New value. * @param oldVal Old value. * @param internal Internal entry (internal key or not user cache), + * @param partId Partition. * @param primary {@code True} if called on primary node. * @param preload Whether update happened during preloading. * @param updateCntr Update counter. @@ -193,6 +259,7 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { * @throws IgniteCheckedException In case of error. */ public void onEntryUpdated( + Map<UUID, CacheContinuousQueryListener> lsnrCol, KeyCacheObject key, CacheObject newVal, CacheObject oldVal, @@ -205,25 +272,16 @@ public class CacheContinuousQueryManager extends GridCacheManagerAdapter { throws IgniteCheckedException { assert key != null; - - if (preload && !internal) - return; - - ConcurrentMap<UUID, CacheContinuousQueryListener> lsnrCol; - - if (internal) - lsnrCol = intLsnrCnt.get() > 0 ? intLsnrs : null; - else - lsnrCol = lsnrCnt.get() > 0 ? lsnrs : null; - - if (F.isEmpty(lsnrCol)) - return; + assert lsnrCol != null; boolean hasNewVal = newVal != null; boolean hasOldVal = oldVal != null; - if (!hasNewVal && !hasOldVal) + if (!hasNewVal && !hasOldVal) { + skipUpdateEvent(lsnrCol, key, partId, updateCntr, topVer); + return; + } EventType evtType = !hasNewVal ? REMOVED : !hasOldVal ? CREATED : UPDATED; http://git-wip-us.apache.org/repos/asf/ignite/blob/4c05fc02/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java index 7c7e3e3..0218897 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java @@ -55,6 +55,7 @@ import org.apache.ignite.internal.processors.GridProcessorAdapter; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheProcessor; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandler; import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; import org.apache.ignite.internal.util.future.GridFinishedFuture; @@ -893,11 +894,18 @@ public class GridContinuousProcessor extends GridProcessorAdapter { } // Load partition counters. - if (hnd.isQuery() && ctx.cache() != null && ctx.cache().internalCache(hnd.cacheName()) != null) { - Map<Integer, Long> cntrs = ctx.cache().internalCache(hnd.cacheName()) - .context().topology().updateCounters(); + if (hnd.isQuery()) { + GridCacheProcessor proc = ctx.cache(); - req.addUpdateCounters(cntrs); + if (proc != null) { + GridCacheAdapter cache = ctx.cache().internalCache(hnd.cacheName()); + + if (cache != null && !cache.isLocal()) { + Map<Integer, Long> cntrs = cache.context().topology().updateCounters(); + + req.addUpdateCounters(cntrs); + } + } } if (err != null)
