Repository: ignite Updated Branches: refs/heads/ignite-1124-debug 610cc88fd -> f41195b1c
ignite-1124-debug Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f41195b1 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f41195b1 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f41195b1 Branch: refs/heads/ignite-1124-debug Commit: f41195b1c1db1d4cf19e87b6a4fb3dff92f71df0 Parents: 610cc88 Author: sboikov <[email protected]> Authored: Mon Aug 24 16:59:28 2015 +0300 Committer: sboikov <[email protected]> Committed: Mon Aug 24 16:59:28 2015 +0300 ---------------------------------------------------------------------- .../dht/atomic/GridNearAtomicUpdateFuture.java | 82 ++++++++++++++++++-- 1 file changed, 77 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f41195b1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index d0c410c..cd974fc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -39,6 +39,7 @@ import org.jetbrains.annotations.*; import org.jsr166.*; import javax.cache.expiry.*; +import java.text.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; @@ -278,10 +279,14 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> /** {@inheritDoc} */ @Override public boolean onNodeLeft(UUID nodeId) { + debug("onNodeLeft " + nodeId); + Boolean single0 = single; if (single0 != null && single0) { if (singleNodeId.equals(nodeId)) { + debug("onNodeLeft single handle " + nodeId); + onDone(addFailedKeys( singleReq.keys(), singleReq.topologyVersion(), @@ -290,12 +295,16 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> return true; } + debug("onNodeLeft single skip " + nodeId); + return false; } GridNearAtomicUpdateRequest req = mappings.get(nodeId); if (req != null) { + debug("onNodeLeft handle " + nodeId); + addFailedKeys(req.keys(), req.topologyVersion(), new ClusterTopologyCheckedException("Primary node left grid before response is received: " + nodeId)); @@ -307,6 +316,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> return true; } + debug("onNodeLeft skip " + nodeId); + return false; } @@ -372,6 +383,32 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> return null; } + @GridToStringInclude + private List<DebugInfo> debug = Collections.synchronizedList(new ArrayList<DebugInfo>(20)); + + /** */ + private static final SimpleDateFormat DEBUG_DATE_FMT = new SimpleDateFormat("HH:mm:ss,SSS"); + + private static class DebugInfo { + String thread = Thread.currentThread().getName(); + + String msg; + + long ts = U.currentTimeMillis(); + + public DebugInfo(String msg) { + this.msg = msg; + } + + public String toString() { + return "Debug [thread=" + thread + ", time=" + DEBUG_DATE_FMT.format(new Date(ts)) + ", msg=" + msg + ']'; + } + } + + private void debug(String msg) { + debug.add(new DebugInfo(msg)); + } + /** * @param failed Keys to remap. * @param errTopVer Topology version for failed update. @@ -384,6 +421,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> if (futVer0 == null || cctx.mvcc().removeAtomicFuture(futVer0) == null) return; + debug("remap " + errTopVer); + Collection<Object> remapKeys = new ArrayList<>(failed.size()); Collection<Object> remapVals = vals != null ? new ArrayList<>(failed.size()) : null; Collection<GridCacheDrInfo> remapConflictPutVals = conflictPutVals != null ? new ArrayList<GridCacheDrInfo>(failed.size()) : null; @@ -531,11 +570,15 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> * @param res Update response. */ public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) { + debug("response " + res + ", node=" + nodeId); + if (res.remapKeys() != null) { assert !fastMap || cctx.kernalContext().clientNode(); Collection<KeyCacheObject> remapKeys = fastMap ? null : res.remapKeys(); + debug("response remap, node=" + nodeId); + mapOnTopology(remapKeys, true, nodeId); return; @@ -552,6 +595,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> updateNear(singleReq, res); if (res.error() != null) { + debug("response single error, node=" + nodeId); + onDone(res.failedKeys() != null ? addFailedKeys(res.failedKeys(), singleReq.topologyVersion(), res.error()) : res.error()); } @@ -565,6 +610,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> else { GridCacheReturn opRes0 = opRes = ret; + debug("response single done, node=" + nodeId); + onDone(opRes0); } } @@ -575,8 +622,11 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> if (req != null) { // req can be null if onResult is being processed concurrently with onNodeLeft. updateNear(req, res); - if (res.error() != null) + if (res.error() != null) { + debug("response error, node=" + nodeId); + addFailedKeys(req.keys(), req.topologyVersion(), res.error()); + } else { if (op == TRANSFORM) { assert !req.fastMap(); @@ -590,6 +640,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> mappings.remove(nodeId); } + else + debug("response no mapping, node=" + nodeId); checkComplete(); } @@ -720,6 +772,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> @Nullable UUID oldNodeId) { assert oldNodeId == null || remap || fastMapRemap; + debug("map0 " + topVer + " " + remap + " " + oldNodeId + " " + remapKeys); + Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer); if (F.isEmpty(topNodes)) { @@ -817,6 +871,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> return; } + debug("mapSingle to " + primary.attribute(IgniteNodeAttributes.ATTR_GRID_NAME) + " " + topVer); + GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest( cctx.cacheId(), primary.id(), @@ -1039,10 +1095,16 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> ) { GridCacheAffinityManager affMgr = cctx.affinity(); - // If we can send updates in parallel - do it. - return fastMap ? - cctx.topology().nodes(affMgr.partition(key), topVer) : - Collections.singletonList(affMgr.primary(key, topVer)); + if (fastMap) { + return cctx.topology().nodes(affMgr.partition(key), topVer); + } + else { + ClusterNode primary = affMgr.primary(key, topVer); + + debug("map to " + primary.attribute(IgniteNodeAttributes.ATTR_GRID_NAME) + " " + topVer); + + return Collections.singletonList(affMgr.primary(key, topVer)); + } } /** @@ -1056,6 +1118,8 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> singleReq = req; if (cctx.localNodeId().equals(nodeId)) { + debug("mapSingle local"); + cache.updateAllAsyncInternal(nodeId, req, new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() { @Override public void apply(GridNearAtomicUpdateRequest req, @@ -1071,12 +1135,16 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> if (log.isDebugEnabled()) log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); + debug("mapSingle to " + req.nodeId()); + cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); if (syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY) onDone(new GridCacheReturn(cctx, true, null, true)); } catch (IgniteCheckedException e) { + debug("mapSingle err " + req.topologyLocked()); + onDone(addFailedKeys(req.keys(), req.topologyVersion(), e)); } } @@ -1105,9 +1173,13 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> if (log.isDebugEnabled()) log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); + debug("send to " + req.nodeId()); + cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); } catch (IgniteCheckedException e) { + debug("send err " + req.nodeId()); + addFailedKeys(req.keys(), req.topologyVersion(), e); removeMapping(req.nodeId());
