IGNITE-3776: Removed code duplication in GridNearAtomicAbstractUpdateFuture.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/87a1928a Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/87a1928a Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/87a1928a Branch: refs/heads/master Commit: 87a1928a4f90b4f8a221041cfff9d22e3dd801cc Parents: 99e3e8a Author: vozerov-gridgain <[email protected]> Authored: Fri Aug 26 15:22:15 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Fri Aug 26 15:22:15 2016 +0300 ---------------------------------------------------------------------- .../GridNearAtomicAbstractUpdateFuture.java | 69 +++++++++++++++++++ .../GridNearAtomicSingleUpdateFuture.java | 70 +------------------- .../dht/atomic/GridNearAtomicUpdateFuture.java | 68 +------------------ 3 files changed, 73 insertions(+), 134 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/87a1928a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java index 3e69c02..85751bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicAbstractUpdateFuture.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.cache.CacheWriteSynchronizationMode; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; @@ -29,6 +30,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; @@ -246,4 +248,71 @@ public abstract class GridNearAtomicAbstractUpdateFuture extends GridFutureAdapt protected boolean storeFuture() { return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC; } + + /** + * Maps future to single node. + * + * @param nodeId Node ID. + * @param req Request. + */ + protected void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) { + if (cctx.localNodeId().equals(nodeId)) { + cache.updateAllAsyncInternal(nodeId, req, + new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() { + @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { + onResult(res.nodeId(), res, false); + } + }); + } + else { + try { + cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); + + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near update fut, sent request [futId=" + req.futureVersion() + + ", writeVer=" + req.updateVersion() + + ", node=" + req.nodeId() + ']'); + } + + if (syncMode == FULL_ASYNC) + onDone(new GridCacheReturn(cctx, true, true, null, true)); + } + catch (IgniteCheckedException e) { + if (msgLog.isDebugEnabled()) { + msgLog.debug("Near update fut, failed to send request [futId=" + req.futureVersion() + + ", writeVer=" + req.updateVersion() + + ", node=" + req.nodeId() + + ", err=" + e + ']'); + } + + onSendError(req, e); + } + } + } + + /** + * Response callback. + * + * @param nodeId Node ID. + * @param res Update response. + * @param nodeErr {@code True} if response was created on node failure. + */ + public abstract void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr); + + /** + * @param req Request. + * @param e Error. + */ + protected void onSendError(GridNearAtomicUpdateRequest req, IgniteCheckedException e) { + synchronized (mux) { + GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(), + req.nodeId(), + req.futureVersion(), + cctx.deploymentEnabled()); + + res.addFailedKeys(req.keys(), e); + + onResult(req.nodeId(), res, true); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/87a1928a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java index aad4186..661a178 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicSingleUpdateFuture.java @@ -37,7 +37,6 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.CI1; -import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -52,7 +51,6 @@ import java.util.Map; import java.util.UUID; import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK; -import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; /** @@ -191,15 +189,9 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda return false; } - /** - * Response callback. - * - * @param nodeId Node ID. - * @param res Update response. - * @param nodeErr {@code True} if response was created on node failure. - */ + /** {@inheritDoc} */ @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"}) - public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) { + @Override public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) { GridNearAtomicUpdateRequest req; AffinityTopologyVersion remapTopVer = null; @@ -441,64 +433,6 @@ public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpda map(topVer); } - /** - * Maps future to single node. - * - * @param nodeId Node ID. - * @param req Request. - */ - private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) { - if (cctx.localNodeId().equals(nodeId)) { - cache.updateAllAsyncInternal(nodeId, req, - new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() { - @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { - onResult(res.nodeId(), res, false); - } - }); - } - else { - try { - cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); - - if (msgLog.isDebugEnabled()) { - msgLog.debug("Near update single fut, sent request [futId=" + req.futureVersion() + - ", writeVer=" + req.updateVersion() + - ", node=" + req.nodeId() + ']'); - } - - if (syncMode == FULL_ASYNC) - onDone(new GridCacheReturn(cctx, true, true, null, true)); - } - catch (IgniteCheckedException e) { - if (msgLog.isDebugEnabled()) { - msgLog.debug("Near update single fut, failed to send request [futId=" + req.futureVersion() + - ", writeVer=" + req.updateVersion() + - ", node=" + req.nodeId() + - ", err=" + e + ']'); - } - - onSendError(req, e); - } - } - } - - /** - * @param req Request. - * @param e Error. - */ - void onSendError(GridNearAtomicUpdateRequest req, IgniteCheckedException e) { - synchronized (mux) { - GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(), - req.nodeId(), - req.futureVersion(), - cctx.deploymentEnabled()); - - res.addFailedKeys(req.keys(), e); - - onResult(req.nodeId(), res, true); - } - } - /** {@inheritDoc} */ protected void map(AffinityTopologyVersion topVer) { Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer); http://git-wip-us.apache.org/repos/asf/ignite/blob/87a1928a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index 0d88ef8..2432f63 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -253,15 +253,9 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu return false; } - /** - * Response callback. - * - * @param nodeId Node ID. - * @param res Update response. - * @param nodeErr {@code True} if response was created on node failure. - */ + /** {@inheritDoc} */ @SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"}) - public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) { + @Override public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) { GridNearAtomicUpdateRequest req; AffinityTopologyVersion remapTopVer = null; @@ -552,47 +546,6 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu } /** - * Maps future to single node. - * - * @param nodeId Node ID. - * @param req Request. - */ - private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) { - if (cctx.localNodeId().equals(nodeId)) { - cache.updateAllAsyncInternal(nodeId, req, - new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() { - @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { - onResult(res.nodeId(), res, false); - } - }); - } - else { - try { - cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); - - if (msgLog.isDebugEnabled()) { - msgLog.debug("Near update fut, sent request [futId=" + req.futureVersion() + - ", writeVer=" + req.updateVersion() + - ", node=" + req.nodeId() + ']'); - } - - if (syncMode == FULL_ASYNC) - onDone(new GridCacheReturn(cctx, true, true, null, true)); - } - catch (IgniteCheckedException e) { - if (msgLog.isDebugEnabled()) { - msgLog.debug("Near update fut, failed to send request [futId=" + req.futureVersion() + - ", writeVer=" + req.updateVersion() + - ", node=" + req.nodeId() + - ", err=" + e + ']'); - } - - onSendError(req, e); - } - } - } - - /** * Sends messages to remote nodes and updates local cache. * * @param mappings Mappings to send. @@ -646,23 +599,6 @@ public class GridNearAtomicUpdateFuture extends GridNearAtomicAbstractUpdateFutu onDone(new GridCacheReturn(cctx, true, true, null, true)); } - /** - * @param req Request. - * @param e Error. - */ - void onSendError(GridNearAtomicUpdateRequest req, IgniteCheckedException e) { - synchronized (mux) { - GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(), - req.nodeId(), - req.futureVersion(), - cctx.deploymentEnabled()); - - res.addFailedKeys(req.keys(), e); - - onResult(req.nodeId(), res, true); - } - } - /** {@inheritDoc} */ protected void map(AffinityTopologyVersion topVer) { map(topVer, null);
