Updated Branches: refs/heads/trunk 7b3b3a099 -> 02fb21d93
GIRAPH-506: Concurrency issue - response can arrive before request is added to the outstanding map (majakabiljo) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/02fb21d9 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/02fb21d9 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/02fb21d9 Branch: refs/heads/trunk Commit: 02fb21d9311d6c7625b68534937b4bc7212a73ef Parents: 7b3b3a0 Author: Maja Kabiljo <[email protected]> Authored: Wed Feb 6 15:35:14 2013 -0800 Committer: Maja Kabiljo <[email protected]> Committed: Wed Feb 6 15:51:21 2013 -0800 ---------------------------------------------------------------------- CHANGELOG | 2 ++ .../org/apache/giraph/comm/netty/NettyClient.java | 8 ++++++-- 2 files changed, 8 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/02fb21d9/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 405d729..05bcf91 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 0.2.0 - unreleased + GIRAPH-506: Concurrency issue - response can arrive before request is added to the outstanding map (majakabiljo) + GIRAPH-501: WorkerObserver (nitay) GIRAPH-502: In PageRankBenchmark, remove unneeded handling of -t 2 (ekoontz) http://git-wip-us.apache.org/repos/asf/giraph/blob/02fb21d9/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java index 76d38e2..89ef87f 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java @@ -644,8 +644,6 @@ public class NettyClient { addressRequestIdGenerator.getNextRequestId(remoteServer)); ClientRequestId clientRequestId = new ClientRequestId(destTaskId, request.getRequestId()); - ChannelFuture writeFuture = channel.write(request); - newRequestInfo.setWriteFuture(writeFuture); RequestInfo oldRequestInfo = clientRequestIdRequestInfoMap.putIfAbsent( clientRequestId, newRequestInfo); if (oldRequestInfo != null) { @@ -654,6 +652,8 @@ public class NettyClient { "request info of " + oldRequestInfo); } } + ChannelFuture writeFuture = channel.write(request); + newRequestInfo.setWriteFuture(writeFuture); if (limitNumberOfOpenRequests && clientRequestIdRequestInfoMap.size() > maxNumberOfOpenRequests) { @@ -739,6 +739,10 @@ public class NettyClient { clientRequestIdRequestInfoMap.entrySet()) { RequestInfo requestInfo = entry.getValue(); ChannelFuture writeFuture = requestInfo.getWriteFuture(); + // Request wasn't sent yet + if (writeFuture == null) { + continue; + } // If not connected anymore, request failed, or the request is taking // too long, re-establish and resend if (!writeFuture.getChannel().isConnected() ||
