Updated Branches:
  refs/heads/trunk 7b3b3a099 -> 02fb21d93

GIRAPH-506: Concurrency issue - response can arrive before request is added to 
the outstanding map (majakabiljo)


Project: http://git-wip-us.apache.org/repos/asf/giraph/repo
Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/02fb21d9
Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/02fb21d9
Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/02fb21d9

Branch: refs/heads/trunk
Commit: 02fb21d9311d6c7625b68534937b4bc7212a73ef
Parents: 7b3b3a0
Author: Maja Kabiljo <[email protected]>
Authored: Wed Feb 6 15:35:14 2013 -0800
Committer: Maja Kabiljo <[email protected]>
Committed: Wed Feb 6 15:51:21 2013 -0800

----------------------------------------------------------------------
 CHANGELOG                                          |    2 ++
 .../org/apache/giraph/comm/netty/NettyClient.java  |    8 ++++++--
 2 files changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/giraph/blob/02fb21d9/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index 405d729..05bcf91 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,6 +1,8 @@
 Giraph Change Log
 
 Release 0.2.0 - unreleased
+  GIRAPH-506: Concurrency issue - response can arrive before request is added 
to the outstanding map (majakabiljo)
+
   GIRAPH-501: WorkerObserver (nitay)
 
   GIRAPH-502: In PageRankBenchmark, remove unneeded handling of -t 2 (ekoontz)

http://git-wip-us.apache.org/repos/asf/giraph/blob/02fb21d9/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
----------------------------------------------------------------------
diff --git 
a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java 
b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
index 76d38e2..89ef87f 100644
--- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
+++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java
@@ -644,8 +644,6 @@ public class NettyClient {
         addressRequestIdGenerator.getNextRequestId(remoteServer));
       ClientRequestId clientRequestId =
         new ClientRequestId(destTaskId, request.getRequestId());
-      ChannelFuture writeFuture = channel.write(request);
-      newRequestInfo.setWriteFuture(writeFuture);
       RequestInfo oldRequestInfo = clientRequestIdRequestInfoMap.putIfAbsent(
         clientRequestId, newRequestInfo);
       if (oldRequestInfo != null) {
@@ -654,6 +652,8 @@ public class NettyClient {
           "request info of " + oldRequestInfo);
       }
     }
+    ChannelFuture writeFuture = channel.write(request);
+    newRequestInfo.setWriteFuture(writeFuture);
 
     if (limitNumberOfOpenRequests &&
         clientRequestIdRequestInfoMap.size() > maxNumberOfOpenRequests) {
@@ -739,6 +739,10 @@ public class NettyClient {
         clientRequestIdRequestInfoMap.entrySet()) {
       RequestInfo requestInfo = entry.getValue();
       ChannelFuture writeFuture = requestInfo.getWriteFuture();
+      // Request wasn't sent yet
+      if (writeFuture == null) {
+        continue;
+      }
       // If not connected anymore, request failed, or the request is taking
       // too long, re-establish and resend
       if (!writeFuture.getChannel().isConnected() ||

Reply via email to