5578
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/18f4929a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/18f4929a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/18f4929a Branch: refs/heads/ignite-5578 Commit: 18f4929a615ca20c3107a1a76be8e2de6b78ad73 Parents: da0c884 Author: sboikov <[email protected]> Authored: Thu Jul 27 15:27:07 2017 +0300 Committer: sboikov <[email protected]> Committed: Thu Jul 27 15:27:07 2017 +0300 ---------------------------------------------------------------------- .../dht/atomic/GridDhtAtomicCache.java | 125 ++++++++++++++----- 1 file changed, 94 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/18f4929a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index be4aace..75d060f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -38,6 +38,7 @@ import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.pagemem.wal.StorageException; import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -61,6 +62,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; @@ -106,6 +108,7 @@ import org.apache.ignite.lang.IgniteInClosure; import org.apache.ignite.lang.IgniteRunnable; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.plugin.security.SecurityPermission; +import org.apache.ignite.thread.IgniteThread; import org.apache.ignite.transactions.TransactionIsolation; import org.jetbrains.annotations.Nullable; @@ -1683,7 +1686,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { return; } - updateAllAsyncInternal0(node, req, completionCb); + for (;;) { + if (updateAllAsyncInternal0(node, req, completionCb)) + break; + } } else { forceFut.listen(new CI1<IgniteInternalFuture<Object>>() { @@ -1700,7 +1706,10 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { return; } - updateAllAsyncInternal0(node, req, completionCb); + for (;;) { + if (updateAllAsyncInternal0(node, req, completionCb)) + break; + } } }); } @@ -1735,12 +1744,16 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @param node Node. * @param req Update request. * @param completionCb Completion callback. + * @return {@code True} if update was executed, {@code false} if need retry update. */ - private void updateAllAsyncInternal0( + private boolean updateAllAsyncInternal0( ClusterNode node, GridNearAtomicAbstractUpdateRequest req, UpdateReplyClosure completionCb ) { + if (waitForTopologyFuture(node, req, completionCb)) + return true; + GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), node.id(), req.futureId(), @@ -1750,9 +1763,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { assert !req.returnValue() || (req.operation() == TRANSFORM || req.size() == 1); - GridDhtAtomicAbstractUpdateFuture dhtFut = null; + GridDhtTopologyFuture topFut = null; - boolean remap = false; + GridDhtAtomicAbstractUpdateFuture dhtFut = null; IgniteCacheExpiryPolicy expiry = null; @@ -1778,23 +1791,26 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { completionCb.apply(req, res); - return; + return true; } - // Do not check topology version if topology was locked on near node by - // external transaction or explicit lock. - if (req.topologyLocked() || !needRemap(req.topologyVersion(), top.topologyVersion())) { - DhtAtomicUpdateResult updRes = update(node, locked, req, res); + topFut = top.topologyVersionFuture(); - dhtFut = updRes.dhtFuture(); - deleted = updRes.deleted(); - expiry = updRes.expiryPolicy(); - } - else { - // Should remap all keys. - remap = true; + if (topFut.isDone()) { + topFut = null; + + // Do not check topology version if topology was locked on near node by + // external transaction or explicit lock. + if (req.topologyLocked() || !needRemap(req.topologyVersion(), top.topologyVersion())) { + DhtAtomicUpdateResult updRes = update(node, locked, req, res); - res.remapTopologyVersion(top.topologyVersion()); + dhtFut = updRes.dhtFuture(); + deleted = updRes.deleted(); + expiry = updRes.expiryPolicy(); + } + else + // Should remap all keys. + res.remapTopologyVersion(top.topologyVersion()); } } finally { @@ -1829,8 +1845,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (log.isDebugEnabled()) log.debug("Caught invalid partition exception for cache entry (will remap update request): " + req); - remap = true; - res.remapTopologyVersion(ctx.topology().topologyVersion()); } catch (Throwable e) { @@ -1845,26 +1859,75 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (e instanceof Error) throw (Error)e; - return; + return true; } finally { ctx.shared().database().checkpointReadUnlock(); } - if (remap) { - assert dhtFut == null; + if (topFut == null) { + if (res.remapTopologyVersion() != null) { + assert dhtFut == null; - completionCb.apply(req, res); - } - else { - if (dhtFut != null) - dhtFut.map(node, res.returnValue(), res, completionCb); + completionCb.apply(req, res); + } + else { + if (dhtFut != null) + dhtFut.map(node, res.returnValue(), res, completionCb); + } + + if (req.writeSynchronizationMode() != FULL_ASYNC) + req.cleanup(!node.isLocal()); + + sendTtlUpdateRequest(expiry); + + return true; } + else + return waitForTopologyFuture(node, req, completionCb); + } + + /** + * @param node Sender node. + * @param req Request. + * @param completionCb Completion callback. + * @return {@code True} if update will be retried from future listener. + */ + private boolean waitForTopologyFuture(final ClusterNode node, + final GridNearAtomicAbstractUpdateRequest req, + final UpdateReplyClosure completionCb) { + GridDhtTopologyFuture topFut = ctx.group().topology().topologyVersionFuture(); + + if (!topFut.isDone()) { + Thread curThread = Thread.currentThread(); + + if (curThread instanceof IgniteThread) { + IgniteThread thread = (IgniteThread)curThread; + + if (thread.policy() == GridIoPolicy.SYSTEM_POOL) { + topFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { + ctx.closures().runLocalSafe(new Runnable() { + @Override public void run() { + updateAllAsyncInternal0(node, req, completionCb); + } + }); + } + }); + + return true; + } + } - if (req.writeSynchronizationMode() != FULL_ASYNC) - req.cleanup(!node.isLocal()); + try { + topFut.get(); + } + catch (IgniteCheckedException e) { + U.error(log, "Topology future failed: " + e, e); + } + } - sendTtlUpdateRequest(expiry); + return false; } /**
