Repository: ignite
Updated Branches:
  refs/heads/ignite-1124-debug 610cc88fd -> f41195b1c


ignite-1124-debug


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f41195b1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f41195b1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f41195b1

Branch: refs/heads/ignite-1124-debug
Commit: f41195b1c1db1d4cf19e87b6a4fb3dff92f71df0
Parents: 610cc88
Author: sboikov <[email protected]>
Authored: Mon Aug 24 16:59:28 2015 +0300
Committer: sboikov <[email protected]>
Committed: Mon Aug 24 16:59:28 2015 +0300

----------------------------------------------------------------------
 .../dht/atomic/GridNearAtomicUpdateFuture.java  | 82 ++++++++++++++++++--
 1 file changed, 77 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/f41195b1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
index d0c410c..cd974fc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java
@@ -39,6 +39,7 @@ import org.jetbrains.annotations.*;
 import org.jsr166.*;
 
 import javax.cache.expiry.*;
+import java.text.*;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
@@ -278,10 +279,14 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
 
     /** {@inheritDoc} */
     @Override public boolean onNodeLeft(UUID nodeId) {
+        debug("onNodeLeft " + nodeId);
+
         Boolean single0 = single;
 
         if (single0 != null && single0) {
             if (singleNodeId.equals(nodeId)) {
+                debug("onNodeLeft single handle " + nodeId);
+
                 onDone(addFailedKeys(
                     singleReq.keys(),
                     singleReq.topologyVersion(),
@@ -290,12 +295,16 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
                 return true;
             }
 
+            debug("onNodeLeft single skip " + nodeId);
+
             return false;
         }
 
         GridNearAtomicUpdateRequest req = mappings.get(nodeId);
 
         if (req != null) {
+            debug("onNodeLeft handle " + nodeId);
+
             addFailedKeys(req.keys(),
                 req.topologyVersion(),
                 new ClusterTopologyCheckedException("Primary node left grid 
before response is received: " + nodeId));
@@ -307,6 +316,8 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
             return true;
         }
 
+        debug("onNodeLeft skip " + nodeId);
+
         return false;
     }
 
@@ -372,6 +383,32 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
         return null;
     }
 
+    @GridToStringInclude
+    private List<DebugInfo> debug = Collections.synchronizedList(new 
ArrayList<DebugInfo>(20));
+
+    /** */
+    private static final SimpleDateFormat DEBUG_DATE_FMT = new 
SimpleDateFormat("HH:mm:ss,SSS");
+
+    private static class DebugInfo {
+        String thread = Thread.currentThread().getName();
+
+        String msg;
+
+        long ts = U.currentTimeMillis();
+
+        public DebugInfo(String msg) {
+            this.msg = msg;
+        }
+
+        public String toString() {
+            return "Debug [thread=" + thread + ", time=" + 
DEBUG_DATE_FMT.format(new Date(ts)) + ", msg=" + msg + ']';
+        }
+    }
+
+    private void debug(String msg) {
+        debug.add(new DebugInfo(msg));
+    }
+
     /**
      * @param failed Keys to remap.
      * @param errTopVer Topology version for failed update.
@@ -384,6 +421,8 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
         if (futVer0 == null || cctx.mvcc().removeAtomicFuture(futVer0) == null)
             return;
 
+        debug("remap " + errTopVer);
+
         Collection<Object> remapKeys = new ArrayList<>(failed.size());
         Collection<Object> remapVals = vals != null ? new 
ArrayList<>(failed.size()) : null;
         Collection<GridCacheDrInfo> remapConflictPutVals = conflictPutVals != 
null ? new ArrayList<GridCacheDrInfo>(failed.size()) : null;
@@ -531,11 +570,15 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
      * @param res Update response.
      */
     public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) {
+        debug("response " + res + ", node=" + nodeId);
+
         if (res.remapKeys() != null) {
             assert !fastMap || cctx.kernalContext().clientNode();
 
             Collection<KeyCacheObject> remapKeys = fastMap ? null : 
res.remapKeys();
 
+            debug("response remap, node=" + nodeId);
+
             mapOnTopology(remapKeys, true, nodeId);
 
             return;
@@ -552,6 +595,8 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
             updateNear(singleReq, res);
 
             if (res.error() != null) {
+                debug("response single error, node=" + nodeId);
+
                 onDone(res.failedKeys() != null ?
                     addFailedKeys(res.failedKeys(), 
singleReq.topologyVersion(), res.error()) : res.error());
             }
@@ -565,6 +610,8 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
                 else {
                     GridCacheReturn opRes0 = opRes = ret;
 
+                    debug("response single done, node=" + nodeId);
+
                     onDone(opRes0);
                 }
             }
@@ -575,8 +622,11 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
             if (req != null) { // req can be null if onResult is being 
processed concurrently with onNodeLeft.
                 updateNear(req, res);
 
-                if (res.error() != null)
+                if (res.error() != null) {
+                    debug("response error, node=" + nodeId);
+
                     addFailedKeys(req.keys(), req.topologyVersion(), 
res.error());
+                }
                 else {
                     if (op == TRANSFORM) {
                         assert !req.fastMap();
@@ -590,6 +640,8 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
 
                 mappings.remove(nodeId);
             }
+            else
+                debug("response no mapping, node=" + nodeId);
 
             checkComplete();
         }
@@ -720,6 +772,8 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
         @Nullable UUID oldNodeId) {
         assert oldNodeId == null || remap || fastMapRemap;
 
+        debug("map0 " + topVer + " " + remap + " " + oldNodeId + " " + 
remapKeys);
+
         Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
 
         if (F.isEmpty(topNodes)) {
@@ -817,6 +871,8 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
                 return;
             }
 
+            debug("mapSingle to " + 
primary.attribute(IgniteNodeAttributes.ATTR_GRID_NAME) + " " + topVer);
+
             GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest(
                 cctx.cacheId(),
                 primary.id(),
@@ -1039,10 +1095,16 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
     ) {
         GridCacheAffinityManager affMgr = cctx.affinity();
 
-        // If we can send updates in parallel - do it.
-        return fastMap ?
-            cctx.topology().nodes(affMgr.partition(key), topVer) :
-            Collections.singletonList(affMgr.primary(key, topVer));
+        if (fastMap) {
+            return cctx.topology().nodes(affMgr.partition(key), topVer);
+        }
+        else {
+            ClusterNode primary = affMgr.primary(key, topVer);
+
+            debug("map to " + 
primary.attribute(IgniteNodeAttributes.ATTR_GRID_NAME) + " " + topVer);
+
+            return Collections.singletonList(affMgr.primary(key, topVer));
+        }
     }
 
     /**
@@ -1056,6 +1118,8 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
         singleReq = req;
 
         if (cctx.localNodeId().equals(nodeId)) {
+            debug("mapSingle local");
+
             cache.updateAllAsyncInternal(nodeId, req,
                 new CI2<GridNearAtomicUpdateRequest, 
GridNearAtomicUpdateResponse>() {
                     @Override public void apply(GridNearAtomicUpdateRequest 
req,
@@ -1071,12 +1135,16 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
                 if (log.isDebugEnabled())
                     log.debug("Sending near atomic update request [nodeId=" + 
req.nodeId() + ", req=" + req + ']');
 
+                debug("mapSingle to " + req.nodeId());
+
                 cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
 
                 if (syncMode == FULL_ASYNC && 
cctx.config().getAtomicWriteOrderMode() == PRIMARY)
                     onDone(new GridCacheReturn(cctx, true, null, true));
             }
             catch (IgniteCheckedException e) {
+                debug("mapSingle err " + req.topologyLocked());
+
                 onDone(addFailedKeys(req.keys(), req.topologyVersion(), e));
             }
         }
@@ -1105,9 +1173,13 @@ public class GridNearAtomicUpdateFuture extends 
GridFutureAdapter<Object>
                     if (log.isDebugEnabled())
                         log.debug("Sending near atomic update request 
[nodeId=" + req.nodeId() + ", req=" + req + ']');
 
+                    debug("send to " + req.nodeId());
+
                     cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
                 }
                 catch (IgniteCheckedException e) {
+                    debug("send err " + req.nodeId());
+
                     addFailedKeys(req.keys(), req.topologyVersion(), e);
 
                     removeMapping(req.nodeId());

Reply via email to