IGNITE-426 WIP
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0408ed87 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0408ed87 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0408ed87 Branch: refs/heads/ignite-426-2-reb Commit: 0408ed87bfa7ac251eb57423d1a422d2f0b71d89 Parents: 384e57c Author: nikolay_tikhonov <[email protected]> Authored: Fri Sep 11 18:57:59 2015 +0300 Committer: nikolay_tikhonov <[email protected]> Committed: Wed Oct 28 15:14:09 2015 +0300 ---------------------------------------------------------------------- .../internal/GridEventConsumeHandler.java | 21 +- .../internal/GridMessageListenHandler.java | 17 + .../communication/GridIoMessageFactory.java | 18 + .../processors/cache/GridCacheEntryEx.java | 4 +- .../processors/cache/GridCacheMapEntry.java | 142 ++++++-- .../GridCachePartitionExchangeManager.java | 4 +- .../cache/GridCacheUpdateAtomicResult.java | 15 +- .../dht/atomic/GridDhtAtomicCache.java | 83 +++-- .../dht/atomic/GridDhtAtomicUpdateFuture.java | 22 +- .../dht/atomic/GridDhtAtomicUpdateRequest.java | 124 +++++-- .../GridDhtPartitionsExchangeFuture.java | 35 +- .../preloader/GridDhtPartitionsFullMessage.java | 57 ++- .../GridDhtPartitionsSingleMessage.java | 49 ++- .../distributed/near/GridNearAtomicCache.java | 14 +- .../continuous/CacheContinuousQueryEntry.java | 7 + .../CacheContinuousQueryFilteredEntry.java | 228 ++++++++++++ .../continuous/CacheContinuousQueryHandler.java | 294 ++++++++++++--- .../CacheContinuousQueryListener.java | 8 + .../CacheContinuousQueryLostPartition.java | 156 ++++++++ .../continuous/CacheContinuousQueryManager.java | 22 +- .../continuous/GridContinuousBatch.java | 39 +- .../continuous/GridContinuousBatchAdapter.java | 43 ++- .../continuous/GridContinuousHandler.java | 22 ++ .../continuous/GridContinuousProcessor.java | 5 + .../processors/cache/GridCacheTestEntryEx.java | 4 +- ...acheContinuousQueryFailoverAbstractTest.java | 357 ++++++++++++++++++- .../testframework/junits/GridAbstractTest.java | 2 +- 27 files changed, 1612 insertions(+), 180 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0408ed87/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java index b4ce4ab..ade7597 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java @@ -38,6 +38,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheDeployable; import org.apache.ignite.internal.processors.cache.GridCacheDeploymentManager; +import org.apache.ignite.internal.processors.continuous.GridContinuousBatch; +import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter; import org.apache.ignite.internal.processors.continuous.GridContinuousHandler; import org.apache.ignite.internal.processors.platform.PlatformEventFilterListener; import org.apache.ignite.internal.util.typedef.F; @@ -213,8 +215,8 @@ class GridEventConsumeHandler implements GridContinuousHandler { } } - ctx.continuous().addNotification(t3.get1(), t3.get2(), wrapper, null, false, - false); + ctx.continuous().addNotification(t3.get1(), t3.get2(), wrapper, null, + false, false); } catch (ClusterTopologyCheckedException ignored) { // No-op. @@ -377,6 +379,21 @@ class GridEventConsumeHandler implements GridContinuousHandler { } /** {@inheritDoc} */ + @Override public GridContinuousBatch createBatch() { + return new GridContinuousBatchAdapter(); + } + + /** {@inheritDoc} */ + @Override public void onBatchAcknowledged(UUID routineId, GridContinuousBatch batch, GridKernalContext ctx) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void partitionLost(String cacheName, int partId) { + // No-op. + } + + /** {@inheritDoc} */ @Nullable @Override public Object orderedTopic() { return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/0408ed87/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java index ff38949..e038794 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridMessageListenHandler.java @@ -26,6 +26,8 @@ import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.managers.deployment.GridDeployment; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; +import org.apache.ignite.internal.processors.continuous.GridContinuousBatch; +import org.apache.ignite.internal.processors.continuous.GridContinuousBatchAdapter; import org.apache.ignite.internal.processors.continuous.GridContinuousHandler; import org.apache.ignite.internal.util.lang.GridPeerDeployAware; import org.apache.ignite.internal.util.typedef.internal.S; @@ -167,6 +169,21 @@ public class GridMessageListenHandler implements GridContinuousHandler { } /** {@inheritDoc} */ + @Override public GridContinuousBatch createBatch() { + return new GridContinuousBatchAdapter(); + } + + /** {@inheritDoc} */ + @Override public void onBatchAcknowledged(UUID routineId, GridContinuousBatch batch, GridKernalContext ctx) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void partitionLost(String cacheName, int partId) { + // No-op. + } + + /** {@inheritDoc} */ @Nullable @Override public Object orderedTopic() { return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/0408ed87/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java index 079015c..6eb9e17 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java @@ -90,7 +90,10 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearUnlo import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest; import org.apache.ignite.internal.processors.cache.query.GridCacheQueryResponse; import org.apache.ignite.internal.processors.cache.query.GridCacheSqlQuery; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryBatchAck; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryEntry; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFilteredEntry; +import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryLostPartition; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.transactions.TxEntryValueHolder; @@ -684,6 +687,21 @@ public class GridIoMessageFactory implements MessageFactory { break; + case 114: + msg = new CacheContinuousQueryBatchAck(); + + break; + + case 115: + msg = new CacheContinuousQueryFilteredEntry(); + + break; + + case 116: + msg = new CacheContinuousQueryLostPartition(); + + break; + // [-3..112] - this // [120..123] - DR // [-4..-22] - SQL http://git-wip-us.apache.org/repos/asf/ignite/blob/0408ed87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java index 50b01c8..eb40d20 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEntryEx.java @@ -480,7 +480,9 @@ public interface GridCacheEntryEx { boolean conflictResolve, boolean intercept, @Nullable UUID subjId, - String taskName + String taskName, + @Nullable CacheObject prevVal, + @Nullable Long updateIdx ) throws IgniteCheckedException, GridCacheEntryRemovedException; /** http://git-wip-us.apache.org/repos/asf/ignite/blob/0408ed87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 2111594..d8fa93c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -37,6 +37,7 @@ import org.apache.ignite.internal.managers.deployment.GridDeploymentInfo; import org.apache.ignite.internal.managers.deployment.GridDeploymentInfoBean; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheEntry; import org.apache.ignite.internal.processors.cache.extras.GridCacheEntryExtras; import org.apache.ignite.internal.processors.cache.extras.GridCacheMvccEntryExtras; @@ -1076,6 +1077,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme Object key0 = null; Object val0 = null; + long updateIdx0; + synchronized (this) { checkObsolete(); @@ -1153,6 +1156,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme deletedUnlocked(false); } + updateIdx0 = nextPartIndex(topVer); + update(val, expireTime, ttl, newVer); drReplicate(drType, val, newVer); @@ -1178,8 +1183,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme subjId, null, taskName); } - if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local() && !isNear())) - cctx.continuousQueries().onEntryUpdated(this, key, val, old, false); + if (!isNear()) + cctx.continuousQueries().onEntryUpdated(this, key, val, old, tx.local(), false, updateIdx0, topVer); cctx.dataStructures().onEntryUpdated(key, false); } @@ -1244,6 +1249,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme Cache.Entry entry0 = null; + Long updateIdx0; + synchronized (this) { checkObsolete(); @@ -1256,7 +1263,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } assert tx == null || (!tx.local() && tx.onePhaseCommit()) || tx.ownsLock(this) : - "Transaction does not own lock for remove[entry=" + this + ", tx=" + tx + ']'; + "Transaction does not own lock for remove[entry=" + this + ", tx=" + tx + ']'; boolean startVer = isStartVersion(); @@ -1313,6 +1320,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } } + updateIdx0 = nextPartIndex(topVer); + +// if (updateIdx != null) +// updateIdx0 = updateIdx; + drReplicate(drType, null, newVer); if (metrics && cctx.cache().configuration().isStatisticsEnabled()) @@ -1345,8 +1357,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme taskName); } - if (cctx.isLocal() || cctx.isReplicated() || (tx != null && tx.local() && !isNear())) - cctx.continuousQueries().onEntryUpdated(this, key, null, old, false); + if (!isNear()) + cctx.continuousQueries().onEntryUpdated(this, key, null, old, tx.local(), false, updateIdx0, topVer); cctx.dataStructures().onEntryUpdated(key, true); } @@ -1688,7 +1700,12 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (res) updateMetrics(op, metrics); - cctx.continuousQueries().onEntryUpdated(this, key, val, old, false); + if (!isNear()) { + long updateIdx = nextPartIndex(AffinityTopologyVersion.NONE); + + cctx.continuousQueries().onEntryUpdated(this, key, val, old, true, false, updateIdx, + AffinityTopologyVersion.NONE); + } cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE); @@ -1731,7 +1748,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme boolean conflictResolve, boolean intercept, @Nullable UUID subjId, - String taskName + String taskName, + @Nullable CacheObject prevVal, + @Nullable Long updateIdx ) throws IgniteCheckedException, GridCacheEntryRemovedException, GridClosureException { assert cctx.atomic(); @@ -1740,7 +1759,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme CacheObject oldVal; CacheObject updated; - GridCacheVersion enqueueVer = null; + GridCacheVersion rmvVer = null; GridCacheVersionConflictContext<?, ?> conflictCtx = null; @@ -1757,6 +1776,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme Object key0 = null; Object updated0 = null; + Long updateIdx0 = null; + synchronized (this) { boolean needVal = intercept || retval || op == GridCacheOperation.TRANSFORM || !F.isEmptyOrNulls(filter); @@ -1864,7 +1885,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme CU.EXPIRE_TIME_ETERNAL, null, null, - false); + false, + updateIdx0); } // Will update something. else { @@ -1913,6 +1935,38 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme "[entry=" + this + ", newVer=" + newVer + ']'); } + if (!cctx.isNear()) { + CacheObject evtVal; + + if (op == GridCacheOperation.TRANSFORM) { + EntryProcessor<Object, Object, ?> entryProcessor = + (EntryProcessor<Object, Object, ?>)writeObj; + + CacheInvokeEntry<Object, Object> entry = + new CacheInvokeEntry<>(cctx, key, prevVal, version()); + + try { + entryProcessor.process(entry, invokeArgs); + + evtVal = entry.modified() ? + cctx.toCacheObject(cctx.unwrapTemporary(entry.getValue())) : prevVal; + } + catch (Exception e) { + evtVal = prevVal; + } + } + else + evtVal = (CacheObject)writeObj; + + updateIdx0 = nextPartIndex(topVer); + + if (updateIdx != null) + updateIdx0 = updateIdx; + + cctx.continuousQueries().onEntryUpdated(this, key, evtVal, prevVal, primary, false, + updateIdx0, topVer); + } + return new GridCacheUpdateAtomicResult(false, retval ? rawGetOrUnmarshalUnlocked(false) : null, null, @@ -1921,7 +1975,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme CU.EXPIRE_TIME_ETERNAL, null, null, - false); + false, + updateIdx0); } } else @@ -1997,7 +2052,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme CU.EXPIRE_TIME_ETERNAL, null, null, - false); + false, + updateIdx0); } } @@ -2044,7 +2100,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme CU.EXPIRE_TIME_ETERNAL, null, null, - false); + false, + updateIdx0); } } else @@ -2144,7 +2201,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme CU.EXPIRE_TIME_ETERNAL, null, null, - false); + false, + updateIdx0); else if (interceptorVal != updated0) { updated0 = cctx.unwrapTemporary(interceptorVal); @@ -2181,6 +2239,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme update(updated, newExpireTime, newTtl, newVer); + updateIdx0 = nextPartIndex(topVer); + + if (updateIdx != null) + updateIdx0 = updateIdx; + drReplicate(drType, updated, newVer); recordNodeId(affNodeId, topVer); @@ -2220,7 +2283,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme CU.EXPIRE_TIME_ETERNAL, null, null, - false); + false, + updateIdx0); } if (writeThrough) @@ -2252,7 +2316,7 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } } - enqueueVer = newVer; + rmvVer = newVer; boolean hasValPtr = hasOffHeapPointer(); @@ -2272,6 +2336,11 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme recordNodeId(affNodeId, topVer); + updateIdx0 = nextPartIndex(topVer); + + if (updateIdx != null) + updateIdx0 = updateIdx; + drReplicate(drType, null, newVer); if (evt) { @@ -2301,8 +2370,8 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme if (res) updateMetrics(op, metrics); - if (cctx.isReplicated() || primary) - cctx.continuousQueries().onEntryUpdated(this, key, val, oldVal, false); + if (!isNear()) + cctx.continuousQueries().onEntryUpdated(this, key, val, oldVal, primary, false, updateIdx0, topVer); cctx.dataStructures().onEntryUpdated(key, op == GridCacheOperation.DELETE); @@ -2326,9 +2395,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme invokeRes, newSysTtl, newSysExpireTime, - enqueueVer, + rmvVer, conflictCtx, - true); + true, + updateIdx0); } /** @@ -3149,9 +3219,13 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme drReplicate(drType, val, ver); + long updateIdx = -1; + + if (!preload) + updateIdx = nextPartIndex(topVer); + if (!skipQryNtf) { - if (cctx.isLocal() || cctx.isReplicated() || cctx.affinity().primary(cctx.localNode(), key, topVer)) - cctx.continuousQueries().onEntryUpdated(this, key, val, null, preload); + cctx.continuousQueries().onEntryUpdated(this, key, val, null, true, preload, updateIdx, topVer); cctx.dataStructures().onEntryUpdated(key, false); } @@ -3168,6 +3242,28 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme } } + /** + * @param topVer Topology version. + * @return Update index. + */ + private long nextPartIndex(AffinityTopologyVersion topVer) { + long updateIdx; + + //U.dumpStack(); + + if (!cctx.isLocal() && !isNear()) { + GridDhtLocalPartition locPart = cctx.topology().localPartition(partition(), topVer, false); + + assert locPart != null; + + updateIdx = locPart.nextContinuousQueryUpdateIndex(); + } + else + updateIdx = 0; + + return updateIdx; + } + /** {@inheritDoc} */ @Override public synchronized boolean initialValue(KeyCacheObject key, GridCacheSwapEntry unswapped) throws IgniteCheckedException, @@ -4053,7 +4149,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme */ protected void deletedUnlocked(boolean deleted) { assert Thread.holdsLock(this); - assert cctx.deferredDelete(); + + if (!cctx.deferredDelete()) + return; if (deleted) { assert !deletedUnlocked() : this; http://git-wip-us.apache.org/repos/asf/ignite/blob/0408ed87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index adc2174..0065403 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -892,7 +892,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana top = cacheCtx.topology(); if (top != null) - updated |= top.update(null, entry.getValue()) != null; + updated |= top.update(null, entry.getValue(), null) != null; } if (!cctx.kernalContext().clientNode() && updated) @@ -935,7 +935,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana top = cacheCtx.topology(); if (top != null) - updated |= top.update(null, entry.getValue()) != null; + updated |= top.update(null, entry.getValue(), null) != null; } if (updated) http://git-wip-us.apache.org/repos/asf/ignite/blob/0408ed87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java index 3674284..092d990 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUpdateAtomicResult.java @@ -57,6 +57,9 @@ public class GridCacheUpdateAtomicResult { /** Whether update should be propagated to DHT node. */ private final boolean sndToDht; + /** */ + private final Long updateIdx; + /** Value computed by entry processor. */ private IgniteBiTuple<Object, Exception> res; @@ -72,6 +75,7 @@ public class GridCacheUpdateAtomicResult { * @param rmvVer Version for deferred delete. * @param conflictRes DR resolution result. * @param sndToDht Whether update should be propagated to DHT node. + * @param updateIdx Partition update counter. */ public GridCacheUpdateAtomicResult(boolean success, @Nullable CacheObject oldVal, @@ -81,7 +85,8 @@ public class GridCacheUpdateAtomicResult { long conflictExpireTime, @Nullable GridCacheVersion rmvVer, @Nullable GridCacheVersionConflictContext<?, ?> conflictRes, - boolean sndToDht) { + boolean sndToDht, + long updateIdx) { this.success = success; this.oldVal = oldVal; this.newVal = newVal; @@ -91,6 +96,7 @@ public class GridCacheUpdateAtomicResult { this.rmvVer = rmvVer; this.conflictRes = conflictRes; this.sndToDht = sndToDht; + this.updateIdx = updateIdx; } /** @@ -129,6 +135,13 @@ public class GridCacheUpdateAtomicResult { } /** + * @return Partition update index. + */ + public Long updateIdx() { + return updateIdx; + } + + /** * @return Explicit conflict expire time (if any). Set only if it is necessary to propagate concrete expire time * value to DHT node. Otherwise set to {@link GridCacheUtils#EXPIRE_TIME_CALCULATE}. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/0408ed87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 4cd9e84..8eabae1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -65,6 +65,7 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheA import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache; @@ -1103,7 +1104,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null; try { - topology().readLock(); + GridDhtPartitionTopology top = topology(); + + top.readLock(); try { if (topology().stopping()) { @@ -1120,7 +1123,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { // Also do not check topology version if topology was locked on near node by // external transaction or explicit lock. if ((req.fastMap() && !req.clientRequest()) || req.topologyLocked() || - !needRemap(req.topologyVersion(), topology().topologyVersion())) { + !needRemap(req.topologyVersion(), top.topologyVersion())) { ClusterNode node = ctx.discovery().node(nodeId); if (node == null) { @@ -1135,7 +1138,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (ver == null) { // Assign next version for update inside entries lock. - ver = ctx.versions().next(topology().topologyVersion()); + ver = ctx.versions().next(top.topologyVersion()); if (hasNear) res.nearVersion(ver); @@ -1147,6 +1150,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { log.debug("Using cache version for update request on primary node [ver=" + ver + ", req=" + req + ']'); + boolean sndPrevVal = !top.rebalanceFinished(req.topologyVersion()); + dhtFut = createDhtFuture(ver, req, res, completionCb, false); expiry = expiryPolicy(req.expiry()); @@ -1169,7 +1174,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { completionCb, ctx.isDrEnabled(), taskName, - expiry); + expiry, + sndPrevVal); deleted = updRes.deleted(); dhtFut = updRes.dhtFuture(); @@ -1188,7 +1194,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { completionCb, ctx.isDrEnabled(), taskName, - expiry); + expiry, + sndPrevVal); retVal = updRes.returnValue(); deleted = updRes.deleted(); @@ -1208,7 +1215,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { remap = true; } finally { - topology().readUnlock(); + top.readUnlock(); } } catch (GridCacheEntryRemovedException e) { @@ -1283,6 +1290,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param replicate Whether replication is enabled. * @param taskName Task name. * @param expiry Expiry policy. + * @param sndPrevVal If {@code true} sends previous value to backups. * @return Deleted entries. * @throws GridCacheEntryRemovedException Should not be thrown. */ @@ -1298,7 +1306,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb, boolean replicate, String taskName, - @Nullable IgniteCacheExpiryPolicy expiry + @Nullable IgniteCacheExpiryPolicy expiry, + boolean sndPrevVal ) throws GridCacheEntryRemovedException { assert !ctx.dr().receiveEnabled(); // Cannot update in batches during DR due to possible conflicts. assert !req.returnValue() || req.operation() == TRANSFORM; // Should not request return values for putAll. @@ -1445,7 +1454,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { replicate, updRes, taskName, - expiry); + expiry, + sndPrevVal); firstEntryIdx = i; @@ -1493,7 +1503,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { replicate, updRes, taskName, - expiry); + expiry, + sndPrevVal); firstEntryIdx = i; @@ -1612,7 +1623,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { replicate, updRes, taskName, - expiry); + expiry, + sndPrevVal); } else assert filtered.isEmpty(); @@ -1689,6 +1701,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param replicate Whether DR is enabled for that cache. * @param taskName Task name. * @param expiry Expiry policy. + * @param sndPrevVal If {@code true} sends previous value to backups. * @return Return value. * @throws GridCacheEntryRemovedException Should be never thrown. */ @@ -1703,7 +1716,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb, boolean replicate, String taskName, - @Nullable IgniteCacheExpiryPolicy expiry + @Nullable IgniteCacheExpiryPolicy expiry, + boolean sndPrevVal ) throws GridCacheEntryRemovedException { GridCacheReturn retVal = null; Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null; @@ -1760,7 +1774,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { req.invokeArguments(), primary && writeThrough() && !req.skipStore(), !req.skipStore(), - req.returnValue(), + sndPrevVal || req.returnValue(), expiry, true, true, @@ -1775,7 +1789,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { true, intercept, req.subjectId(), - taskName); + taskName, + null, + null); if (dhtFut == null && !F.isEmpty(filteredReaders)) { dhtFut = createDhtFuture(ver, req, res, completionCb, true); @@ -1792,22 +1808,22 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { else if (conflictCtx.isMerge()) newConflictVer = null; // Conflict version is discarded in case of merge. - EntryProcessor<Object, Object, Object> entryProcessor = null; - if (!readersOnly) { dhtFut.addWriteEntry(entry, updRes.newValue(), - entryProcessor, + op == TRANSFORM ? req.entryProcessor(i) : null, updRes.newTtl(), updRes.conflictExpireTime(), - newConflictVer); + newConflictVer, + sndPrevVal, + updRes.oldValue(), + updRes.updateIdx()); } if (!F.isEmpty(filteredReaders)) dhtFut.addNearWriteEntries(filteredReaders, entry, updRes.newValue(), - entryProcessor, updRes.newTtl(), updRes.conflictExpireTime()); } @@ -1907,6 +1923,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param batchRes Batch update result. * @param taskName Task name. * @param expiry Expiry policy. + * @param sndPrevVal If {@code true} sends previous value to backups. * @return Deleted entries. */ @SuppressWarnings("ForLoopReplaceableByForEach") @@ -1927,7 +1944,8 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { boolean replicate, UpdateBatchResult batchRes, String taskName, - @Nullable IgniteCacheExpiryPolicy expiry + @Nullable IgniteCacheExpiryPolicy expiry, + boolean sndPrevVal ) { assert putMap == null ^ rmvKeys == null; @@ -2029,7 +2047,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { null, /*write-through*/false, /*read-through*/false, - /*retval*/false, + /*retval*/sndPrevVal, expiry, /*event*/true, /*metrics*/true, @@ -2044,7 +2062,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /*conflict resolve*/false, /*intercept*/false, req.subjectId(), - taskName); + taskName, + null, + null); assert !updRes.success() || updRes.newTtl() == CU.TTL_NOT_CHANGED || expiry != null : "success=" + updRes.success() + ", newTtl=" + updRes.newTtl() + ", expiry=" + expiry; @@ -2074,22 +2094,21 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } if (dhtFut != null) { - EntryProcessor<Object, Object, Object> entryProcessor = - entryProcessorMap == null ? null : entryProcessorMap.get(entry.key()); - if (!batchRes.readersOnly()) dhtFut.addWriteEntry(entry, writeVal, - entryProcessor, + entryProcessorMap == null ? null : entryProcessorMap.get(entry.key()), updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE, - null); + null, + sndPrevVal, + updRes.oldValue(), + updRes.updateIdx()); if (!F.isEmpty(filteredReaders)) dhtFut.addNearWriteEntries(filteredReaders, entry, writeVal, - entryProcessor, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE); } @@ -2493,7 +2512,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { entry = entryExx(key); CacheObject val = req.value(i); + CacheObject prevVal = req.previousValue(i); EntryProcessor<Object, Object, Object> entryProcessor = req.entryProcessor(i); + Long updateIdx = req.updateIdx(i); GridCacheOperation op = entryProcessor != null ? TRANSFORM : (val != null) ? UPDATE : DELETE; @@ -2515,7 +2536,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /*event*/true, /*metrics*/true, /*primary*/false, - /*check version*/!req.forceTransformBackups(), + /*check version*/op != TRANSFORM || !req.forceTransformBackups(), req.topologyVersion(), CU.empty0(), replicate ? DR_BACKUP : DR_NONE, @@ -2525,7 +2546,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { false, intercept, req.subjectId(), - taskName); + taskName, + prevVal, + updateIdx); if (updRes.removeVersion() != null) ctx.onDeferredDelete(entry, updRes.removeVersion()); @@ -2567,7 +2590,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } catch (ClusterTopologyCheckedException ignored) { U.warn(log, "Failed to send DHT atomic update response to node because it left grid: " + - req.nodeId()); + nodeId); } catch (IgniteCheckedException e) { U.error(log, "Failed to send DHT atomic update response (did node leave grid?) [nodeId=" + nodeId + http://git-wip-us.apache.org/repos/asf/ignite/blob/0408ed87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 4ace5c4..0d2f580 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -36,6 +36,7 @@ import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; +import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -132,6 +133,9 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> boolean topLocked = updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest()); waitForExchange = !topLocked; + + // We can send entry processor instead of value to backup if updates are ordered. + forceTransformBackups = updateReq.operation() == GridCacheOperation.TRANSFORM; } /** {@inheritDoc} */ @@ -198,16 +202,22 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> * @param ttl TTL (optional). * @param conflictExpireTime Conflict expire time (optional). * @param conflictVer Conflict version (optional). + * @param updateIdx Partition update index. */ public void addWriteEntry(GridDhtCacheEntry entry, @Nullable CacheObject val, EntryProcessor<Object, Object, Object> entryProcessor, long ttl, long conflictExpireTime, - @Nullable GridCacheVersion conflictVer) { + @Nullable GridCacheVersion conflictVer, + boolean addPrevVal, + @Nullable CacheObject prevVal, + @Nullable Long updateIdx) { AffinityTopologyVersion topVer = updateReq.topologyVersion(); - Collection<ClusterNode> dhtNodes = cctx.dht().topology().nodes(entry.partition(), topVer); + int part = entry.partition(); + + Collection<ClusterNode> dhtNodes = cctx.dht().topology().nodes(part, topVer); if (log.isDebugEnabled()) log.debug("Mapping entry to DHT nodes [nodes=" + U.nodeIds(dhtNodes) + ", entry=" + entry + ']'); @@ -244,7 +254,10 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> entryProcessor, ttl, conflictExpireTime, - conflictVer); + conflictVer, + addPrevVal, + prevVal, + updateIdx); } } } @@ -253,14 +266,12 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> * @param readers Entry readers. * @param entry Entry. * @param val Value. - * @param entryProcessor Entry processor.. * @param ttl TTL for near cache update (optional). * @param expireTime Expire time for near cache update (optional). */ public void addNearWriteEntries(Iterable<UUID> readers, GridDhtCacheEntry entry, @Nullable CacheObject val, - EntryProcessor<Object, Object, Object> entryProcessor, long ttl, long expireTime) { CacheWriteSynchronizationMode syncMode = updateReq.writeSynchronizationMode(); @@ -302,7 +313,6 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> updateReq.addNearWriteValue(entry.key(), val, - entryProcessor, ttl, expireTime); } http://git-wip-us.apache.org/repos/asf/ignite/blob/0408ed87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index e55cac9..4d27bfd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -78,6 +78,11 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid @GridDirectCollection(CacheObject.class) private List<CacheObject> vals; + /** Previous values. */ + @GridToStringInclude + @GridDirectCollection(CacheObject.class) + private List<CacheObject> prevVals; + /** Conflict versions. */ @GridDirectCollection(GridCacheVersion.class) private List<GridCacheVersion> conflictVers; @@ -139,6 +144,9 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid /** Task name hash. */ private int taskNameHash; + /** Partition. */ + private GridLongList updateCntrs; + /** * Empty constructor required by {@link Externalizable}. */ @@ -212,13 +220,18 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid * @param ttl TTL (optional). * @param conflictExpireTime Conflict expire time (optional). * @param conflictVer Conflict version (optional). + * @param addPrevVal If {@code true} adds previous value. + * @param prevVal Previous value. */ public void addWriteValue(KeyCacheObject key, @Nullable CacheObject val, EntryProcessor<Object, Object, Object> entryProcessor, long ttl, long conflictExpireTime, - @Nullable GridCacheVersion conflictVer) { + @Nullable GridCacheVersion conflictVer, + boolean addPrevVal, + @Nullable CacheObject prevVal, + @Nullable Long updateIdx) { keys.add(key); if (forceTransformBackups) { @@ -229,6 +242,20 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid else vals.add(val); + if (addPrevVal) { + if (prevVals == null) + prevVals = new ArrayList<>(); + + prevVals.add(prevVal); + } + + if (updateIdx != null) { + if (updateCntrs == null) + updateCntrs = new GridLongList(); + + updateCntrs.add(updateIdx); + } + // In case there is no conflict, do not create the list. if (conflictVer != null) { if (conflictVers == null) { @@ -271,36 +298,21 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid /** * @param key Key to add. * @param val Value, {@code null} if should be removed. - * @param entryProcessor Entry processor. * @param ttl TTL. * @param expireTime Expire time. */ public void addNearWriteValue(KeyCacheObject key, @Nullable CacheObject val, - EntryProcessor<Object, Object, Object> entryProcessor, long ttl, long expireTime) { if (nearKeys == null) { nearKeys = new ArrayList<>(); - - if (forceTransformBackups) { - nearEntryProcessors = new ArrayList<>(); - nearEntryProcessorsBytes = new ArrayList<>(); - } - else - nearVals = new ArrayList<>(); + nearVals = new ArrayList<>(); } nearKeys.add(key); - - if (forceTransformBackups) { - assert entryProcessor != null; - - nearEntryProcessors.add(entryProcessor); - } - else - nearVals.add(val); + nearVals.add(val); if (ttl >= 0) { if (nearTtls == null) { @@ -411,6 +423,17 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid } /** + * @param idx Counter index. + * @return Update counter. + */ + public Long updateIdx(int idx) { + if (idx < updateCntrs.size()) + return updateCntrs.get(idx); + + return null; + } + + /** * @param idx Near key index. * @return Key. */ @@ -431,6 +454,17 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid /** * @param idx Key index. + * @return Value. + */ + @Nullable public CacheObject previousValue(int idx) { + if (prevVals != null) + return prevVals.get(idx); + + return null; + } + + /** + * @param idx Key index. * @return Entry processor. */ @Nullable public EntryProcessor<Object, Object, Object> entryProcessor(int idx) { @@ -684,42 +718,54 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid writer.incrementState(); case 16: - if (!writer.writeUuid("subjId", subjId)) + if (!writer.writeMessage("updateCntrs", updateCntrs)) return false; writer.incrementState(); case 17: - if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) + if (!writer.writeCollection("prevVals", prevVals, MessageCollectionItemType.MSG)) return false; writer.incrementState(); case 18: - if (!writer.writeInt("taskNameHash", taskNameHash)) + if (!writer.writeUuid("subjId", subjId)) return false; writer.incrementState(); case 19: - if (!writer.writeMessage("topVer", topVer)) + if (!writer.writeByte("syncMode", syncMode != null ? (byte)syncMode.ordinal() : -1)) return false; writer.incrementState(); case 20: - if (!writer.writeMessage("ttls", ttls)) + if (!writer.writeInt("taskNameHash", taskNameHash)) return false; writer.incrementState(); case 21: - if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG)) + if (!writer.writeMessage("topVer", topVer)) return false; writer.incrementState(); case 22: + if (!writer.writeMessage("ttls", ttls)) + return false; + + writer.incrementState(); + + case 23: + if (!writer.writeCollection("vals", vals, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + case 24: if (!writer.writeMessage("writeVer", writeVer)) return false; @@ -846,7 +892,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); case 16: - subjId = reader.readUuid("subjId"); + updateCntrs = reader.readMessage("updateCntrs"); if (!reader.isLastRead()) return false; @@ -854,6 +900,22 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); case 17: + prevVals = reader.readCollection("prevVals", MessageCollectionItemType.MSG); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 18: + subjId = reader.readUuid("subjId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 19: byte syncModeOrd; syncModeOrd = reader.readByte("syncMode"); @@ -865,7 +927,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); - case 18: + case 20: taskNameHash = reader.readInt("taskNameHash"); if (!reader.isLastRead()) @@ -873,7 +935,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); - case 19: + case 21: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -881,7 +943,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); - case 20: + case 22: ttls = reader.readMessage("ttls"); if (!reader.isLastRead()) @@ -889,7 +951,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); - case 21: + case 23: vals = reader.readCollection("vals", MessageCollectionItemType.MSG); if (!reader.isLastRead()) @@ -897,7 +959,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid reader.incrementState(); - case 22: + case 24: writeVer = reader.readMessage("writeVer"); if (!reader.isLastRead()) @@ -917,7 +979,7 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 23; + return 25; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/0408ed87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 77e47a7..cb4bb4a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -613,7 +613,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (updateTop) { for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) { if (top.cacheId() == cacheCtx.cacheId()) { - cacheCtx.topology().update(exchId, top.partitionMap(true)); + cacheCtx.topology().update(exchId, + top.partitionMap(true), + top.updateCounters()); break; } @@ -811,6 +813,8 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } } + boolean topChanged = discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT; + for (GridCacheContext cacheCtx : cctx.cacheContexts()) { if (cacheCtx.isLocal()) continue; @@ -821,6 +825,9 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT if (drCacheCtx.isDrEnabled()) drCacheCtx.dr().beforeExchange(topVer, exchId.isLeft()); + if (topChanged) + cacheCtx.continuousQueries().beforeExchange(exchId.topologyVersion()); + // Partition release future is done so we can flush the write-behind store. cacheCtx.store().forceFlush(); @@ -954,14 +961,18 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT * @param id ID. * @throws IgniteCheckedException If failed. */ - private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) throws IgniteCheckedException { + private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) + throws IgniteCheckedException { GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id, clientOnlyExchange, cctx.versions().last()); for (GridCacheContext cacheCtx : cctx.cacheContexts()) { - if (!cacheCtx.isLocal()) + if (!cacheCtx.isLocal()) { m.addLocalPartitionMap(cacheCtx.cacheId(), cacheCtx.topology().localPartitionMap()); + + m.partitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters()); + } } if (log.isDebugEnabled()) @@ -987,15 +998,21 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT boolean ready = startTopVer == null || startTopVer.compareTo(id.topologyVersion()) <= 0; - if (ready) + if (ready) { m.addFullPartitionsMap(cacheCtx.cacheId(), cacheCtx.topology().partitionMap(true)); + + m.addPartitionUpdateCounters(cacheCtx.cacheId(), cacheCtx.topology().updateCounters()); + } } } // It is important that client topologies be added after contexts. - for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) + for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) { m.addFullPartitionsMap(top.cacheId(), top.partitionMap(true)); + m.addPartitionUpdateCounters(top.cacheId(), top.updateCounters()); + } + if (log.isDebugEnabled()) log.debug("Sending full partition map [nodeIds=" + F.viewReadOnly(nodes, F.node2id()) + ", exchId=" + exchId + ", msg=" + m + ']'); @@ -1332,15 +1349,17 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) { Integer cacheId = entry.getKey(); + Map<Integer, Long> cntrMap = msg.partitionUpdateCounters(cacheId); + GridCacheContext cacheCtx = cctx.cacheContext(cacheId); if (cacheCtx != null) - cacheCtx.topology().update(exchId, entry.getValue()); + cacheCtx.topology().update(exchId, entry.getValue(), cntrMap); else { ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, AffinityTopologyVersion.NONE); if (oldest != null && oldest.isLocal()) - cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue()); + cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue(), cntrMap); } } } @@ -1358,7 +1377,7 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT GridDhtPartitionTopology top = cacheCtx != null ? cacheCtx.topology() : cctx.exchange().clientTopology(cacheId, this); - top.update(exchId, entry.getValue()); + top.update(exchId, entry.getValue(), msg.partitionUpdateCounters(cacheId)); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/0408ed87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java index c06d773..758818d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsFullMessage.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; import java.io.Externalizable; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.apache.ignite.IgniteCheckedException; @@ -48,6 +49,14 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa /** */ private byte[] partsBytes; + /** Partitions update counters. */ + @GridToStringInclude + @GridDirectTransient + private Map<Integer, Map<Integer, Long>> partCntrs = new HashMap<>(); + + /** Serialized partitions counters. */ + private byte[] partCntrsBytes; + /** Topology version. */ private AffinityTopologyVersion topVer; @@ -92,13 +101,34 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa parts.put(cacheId, fullMap); } - /** {@inheritDoc} - * @param ctx*/ + /** + * @param cacheId Cache ID. + * @param cntrMap Partition update counters. + */ + public void addPartitionUpdateCounters(int cacheId, Map<Integer, Long> cntrMap) { + if (!partCntrs.containsKey(cacheId)) + partCntrs.put(cacheId, cntrMap); + } + + /** + * @param cacheId Cache ID. + * @return Partition update counters. + */ + public Map<Integer, Long> partitionUpdateCounters(int cacheId) { + Map<Integer, Long> res = partCntrs.get(cacheId); + + return res != null ? res : Collections.<Integer, Long>emptyMap(); + } + + /** {@inheritDoc} */ @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { super.prepareMarshal(ctx); if (parts != null && partsBytes == null) partsBytes = ctx.marshaller().marshal(parts); + + if (partCntrs != null) + partCntrsBytes = ctx.marshaller().marshal(partCntrs); } /** @@ -121,6 +151,9 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa if (partsBytes != null && parts == null) parts = ctx.marshaller().unmarshal(partsBytes, ldr); + + if (partCntrsBytes != null) + partCntrs = ctx.marshaller().unmarshal(partCntrsBytes, ldr); } /** {@inheritDoc} */ @@ -139,12 +172,18 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa switch (writer.state()) { case 5: - if (!writer.writeByteArray("partsBytes", partsBytes)) + if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes)) return false; writer.incrementState(); case 6: + if (!writer.writeByteArray("partsBytes", partsBytes)) + return false; + + writer.incrementState(); + + case 7: if (!writer.writeMessage("topVer", topVer)) return false; @@ -167,7 +206,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa switch (reader.state()) { case 5: - partsBytes = reader.readByteArray("partsBytes"); + partCntrsBytes = reader.readByteArray("partCntrsBytes"); if (!reader.isLastRead()) return false; @@ -175,6 +214,14 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa reader.incrementState(); case 6: + partsBytes = reader.readByteArray("partsBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 7: topVer = reader.readMessage("topVer"); if (!reader.isLastRead()) @@ -194,7 +241,7 @@ public class GridDhtPartitionsFullMessage extends GridDhtPartitionsAbstractMessa /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 7; + return 8; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/0408ed87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java index 83fbb1a..547c0f6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsSingleMessage.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; import java.io.Externalizable; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.apache.ignite.IgniteCheckedException; @@ -46,6 +47,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes /** Serialized partitions. */ private byte[] partsBytes; + /** Partitions update counters. */ + @GridToStringInclude + @GridDirectTransient + private Map<Integer, Map<Integer, Long>> partCntrs = new HashMap<>(); + + /** Serialized partitions counters. */ + private byte[] partCntrsBytes; + /** */ private boolean client; @@ -90,6 +99,24 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes } /** + * @param cacheId Cache ID. + * @param cntrMap Partition update counters. + */ + public void partitionUpdateCounters(int cacheId, Map<Integer, Long> cntrMap) { + partCntrs.put(cacheId, cntrMap); + } + + /** + * @param cacheId Cache ID. + * @return Partition update counters. + */ + public Map<Integer, Long> partitionUpdateCounters(int cacheId) { + Map<Integer, Long> res = partCntrs.get(cacheId); + + return res != null ? res : Collections.<Integer, Long>emptyMap(); + } + + /** * @return Local partitions. */ public Map<Integer, GridDhtPartitionMap> partitions() { @@ -103,6 +130,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes if (partsBytes == null && parts != null) partsBytes = ctx.marshaller().marshal(parts); + + if (partCntrs != null) + partCntrsBytes = ctx.marshaller().marshal(partCntrs); } /** {@inheritDoc} */ @@ -111,6 +141,9 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes if (partsBytes != null && parts == null) parts = ctx.marshaller().unmarshal(partsBytes, ldr); + + if (partCntrsBytes != null) + partCntrs = ctx.marshaller().unmarshal(partCntrsBytes, ldr); } /** {@inheritDoc} */ @@ -135,6 +168,12 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes writer.incrementState(); case 6: + if (!writer.writeByteArray("partCntrsBytes", partCntrsBytes)) + return false; + + writer.incrementState(); + + case 7: if (!writer.writeByteArray("partsBytes", partsBytes)) return false; @@ -165,6 +204,14 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes reader.incrementState(); case 6: + partCntrsBytes = reader.readByteArray("partCntrsBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 7: partsBytes = reader.readByteArray("partsBytes"); if (!reader.isLastRead()) @@ -184,7 +231,7 @@ public class GridDhtPartitionsSingleMessage extends GridDhtPartitionsAbstractMes /** {@inheritDoc} */ @Override public byte fieldsCount() { - return 7; + return 8; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/0408ed87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 1bf03a9..f8bb8fb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -249,7 +249,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { /*write-through*/false, /*read-through*/false, /*retval*/false, - /**expiry policy*/null, + /*expiry policy*/null, /*event*/true, /*metrics*/true, /*primary*/false, @@ -263,7 +263,9 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { false, false, subjId, - taskName); + taskName, + null, + null); if (updRes.removeVersion() != null) ctx.onDeferredDelete(entry, updRes.removeVersion()); @@ -351,7 +353,7 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { /*event*/true, /*metrics*/true, /*primary*/false, - /*check version*/!req.forceTransformBackups(), + /*check version*/op != TRANSFORM || !req.forceTransformBackups(), req.topologyVersion(), CU.empty0(), DR_NONE, @@ -359,9 +361,11 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { expireTime, null, false, - intercept, + /*intercept*/false, req.subjectId(), - taskName); + taskName, + null, + null); if (updRes.removeVersion() != null) ctx.onDeferredDelete(entry, updRes.removeVersion()); http://git-wip-us.apache.org/repos/asf/ignite/blob/0408ed87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java index 9ea9b73..470aa09 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryEntry.java @@ -159,6 +159,13 @@ public class CacheContinuousQueryEntry implements GridCacheDeployable, Message { } /** + * @return Filtered entry. + */ + boolean filtered() { + return false; + } + + /** * @param cctx Cache context. * @throws IgniteCheckedException In case of error. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/0408ed87/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilteredEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilteredEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilteredEntry.java new file mode 100644 index 0000000..14d8f51 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryFilteredEntry.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.query.continuous; + +import java.nio.ByteBuffer; +import javax.cache.event.EventType; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.jetbrains.annotations.Nullable; + +/** + * Continuous query entry. + */ +public class CacheContinuousQueryFilteredEntry extends CacheContinuousQueryEntry { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private EventType evtType; + + /** Cache name. */ + private int cacheId; + + /** Partition. */ + private int part; + + /** Update index. */ + private long updateIdx; + + /** */ + @GridToStringInclude + @GridDirectTransient + private AffinityTopologyVersion topVer; + + /** + * Required by {@link Message}. + */ + public CacheContinuousQueryFilteredEntry() { + // No-op. + } + + /** + * @param e Cache continuous query entry. + */ + CacheContinuousQueryFilteredEntry(CacheContinuousQueryEntry e) { + this.cacheId = e.cacheId(); + this.evtType = e.eventType(); + this.part = e.partition(); + this.updateIdx = e.updateIndex(); + this.topVer = e.topologyVersion(); + } + + /** + * @return Topology version if applicable. + */ + @Nullable AffinityTopologyVersion topologyVersion() { + return topVer; + } + + /** + * @return Cache ID. + */ + int cacheId() { + return cacheId; + } + + /** + * @return Event type. + */ + EventType eventType() { + return evtType; + } + + /** + * @return Partition. + */ + int partition() { + return part; + } + + /** + * @return Update index. + */ + long updateIndex() { + return updateIdx; + } + + /** {@inheritDoc} */ + @Override boolean filtered() { + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 115; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeInt("cacheId", cacheId)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeByte("evtType", evtType != null ? (byte)evtType.ordinal() : -1)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeInt("part", part)) + return false; + + writer.incrementState(); + + case 3: + if (!writer.writeLong("updateIdx", updateIdx)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + cacheId = reader.readInt("cacheId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + byte evtTypeOrd; + + evtTypeOrd = reader.readByte("evtType"); + + if (!reader.isLastRead()) + return false; + + evtType = CacheContinuousQueryEntry.eventTypeFromOrdinal(evtTypeOrd); + + reader.incrementState(); + + case 2: + part = reader.readInt("part"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 3: + updateIdx = reader.readLong("updateIdx"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } + + return reader.afterMessageRead(CacheContinuousQueryFilteredEntry.class); + } + + /** {@inheritDoc} */ + @Override void prepareMarshal(GridCacheContext cctx) throws IgniteCheckedException { + // No-op. + } + + /** {@inheritDoc} */ + @Override void unmarshal(GridCacheContext cctx, @Nullable ClassLoader ldr) throws IgniteCheckedException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 4; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(CacheContinuousQueryFilteredEntry.class, this); + } +} \ No newline at end of file
