Updated Branches: refs/heads/trunk 62c12fa0b -> b022dce9f
GIRAPH-497: Limiting number of open requests doesn't work with multithreading (majakabiljo via ereisman) Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/b022dce9 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/b022dce9 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/b022dce9 Branch: refs/heads/trunk Commit: b022dce9f32f2f48015a2b1489e4d72f08916bab Parents: 62c12fa Author: Eli Reisman <[email protected]> Authored: Tue Feb 5 10:08:15 2013 -0800 Committer: Eli Reisman <[email protected]> Committed: Tue Feb 5 10:08:15 2013 -0800 ---------------------------------------------------------------------- CHANGELOG | 2 + .../org/apache/giraph/comm/netty/NettyClient.java | 115 +++++++++------ 2 files changed, 70 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/b022dce9/CHANGELOG ---------------------------------------------------------------------- diff --git a/CHANGELOG b/CHANGELOG index 2d25746..d50e1e5 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,6 +1,8 @@ Giraph Change Log Release 0.2.0 - unreleased + GIRAPH-497: Limiting number of open requests doesn't work with multithreading (majakabiljo via ereisman) + GIRAPH-461: Convert static assignment of in-memory partitions with LRU cache (claudio) GIRAPH-494: Make Edge an interface (nitay) http://git-wip-us.apache.org/repos/asf/giraph/blob/b022dce9/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java index ed92d82..76d38e2 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java @@ -65,6 +65,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import static org.jboss.netty.channel.Channels.pipeline; @@ -150,6 +151,9 @@ public class NettyClient { private final ExecutionHandler executionHandler; /** Name of the handler before the execution handler (if used) */ private final String handlerBeforeExecutionHandler; + /** When was the last time we checked if we should resend some requests */ + private final AtomicLong lastTimeCheckedRequestsForProblems = + new AtomicLong(0); /** * Only constructor @@ -640,6 +644,8 @@ public class NettyClient { addressRequestIdGenerator.getNextRequestId(remoteServer)); ClientRequestId clientRequestId = new ClientRequestId(destTaskId, request.getRequestId()); + ChannelFuture writeFuture = channel.write(request); + newRequestInfo.setWriteFuture(writeFuture); RequestInfo oldRequestInfo = clientRequestIdRequestInfoMap.putIfAbsent( clientRequestId, newRequestInfo); if (oldRequestInfo != null) { @@ -648,8 +654,6 @@ public class NettyClient { "request info of " + oldRequestInfo); } } - ChannelFuture writeFuture = channel.write(request); - newRequestInfo.setWriteFuture(writeFuture); if (limitNumberOfOpenRequests && clientRequestIdRequestInfoMap.size() > maxNumberOfOpenRequests) { @@ -679,9 +683,6 @@ public class NettyClient { * complete */ private void waitSomeRequests(int maxOpenRequests) { - List<ClientRequestId> addedRequestIds = Lists.newArrayList(); - List<RequestInfo> addedRequestInfos = Lists.newArrayList(); - while (clientRequestIdRequestInfoMap.size() > maxOpenRequests) { // Wait for requests to complete for some time if (LOG.isInfoEnabled() && requestLogger.isPrintable()) { @@ -712,53 +713,73 @@ public class NettyClient { // Make sure that waiting doesn't kill the job context.progress(); - // Check all the requests for problems - for (Map.Entry<ClientRequestId, RequestInfo> entry : - clientRequestIdRequestInfoMap.entrySet()) { - RequestInfo requestInfo = entry.getValue(); - ChannelFuture writeFuture = requestInfo.getWriteFuture(); - // If not connected anymore, request failed, or the request is taking - // too long, re-establish and resend - if (!writeFuture.getChannel().isConnected() || - (writeFuture.isDone() && !writeFuture.isSuccess()) || - (requestInfo.getElapsedMsecs() > maxRequestMilliseconds)) { - LOG.warn("waitSomeRequests: Problem with request id " + - entry.getKey() + " connected = " + - writeFuture.getChannel().isConnected() + - ", future done = " + writeFuture.isDone() + ", " + - "success = " + writeFuture.isSuccess() + ", " + - "cause = " + writeFuture.getCause() + ", " + - "elapsed time = " + requestInfo.getElapsedMsecs() + ", " + - "destination = " + writeFuture.getChannel().getRemoteAddress() + - " " + requestInfo); - addedRequestIds.add(entry.getKey()); - addedRequestInfos.add(new RequestInfo( - requestInfo.getDestinationAddress(), requestInfo.getRequest())); - } + checkRequestsForProblems(); + } + } + + /** + * Check if there are some open requests which have been sent a long time + * ago, and if so resend them. + */ + private void checkRequestsForProblems() { + long lastTimeChecked = lastTimeCheckedRequestsForProblems.get(); + // If not enough time passed from the previous check, return + if (System.currentTimeMillis() < lastTimeChecked + waitingRequestMsecs) { + return; + } + // If another thread did the check already, return + if (!lastTimeCheckedRequestsForProblems.compareAndSet(lastTimeChecked, + System.currentTimeMillis())) { + return; + } + List<ClientRequestId> addedRequestIds = Lists.newArrayList(); + List<RequestInfo> addedRequestInfos = Lists.newArrayList(); + // Check all the requests for problems + for (Map.Entry<ClientRequestId, RequestInfo> entry : + clientRequestIdRequestInfoMap.entrySet()) { + RequestInfo requestInfo = entry.getValue(); + ChannelFuture writeFuture = requestInfo.getWriteFuture(); + // If not connected anymore, request failed, or the request is taking + // too long, re-establish and resend + if (!writeFuture.getChannel().isConnected() || + (writeFuture.isDone() && !writeFuture.isSuccess()) || + (requestInfo.getElapsedMsecs() > maxRequestMilliseconds)) { + LOG.warn("checkRequestsForProblems: Problem with request id " + + entry.getKey() + " connected = " + + writeFuture.getChannel().isConnected() + + ", future done = " + writeFuture.isDone() + ", " + + "success = " + writeFuture.isSuccess() + ", " + + "cause = " + writeFuture.getCause() + ", " + + "elapsed time = " + requestInfo.getElapsedMsecs() + ", " + + "destination = " + writeFuture.getChannel().getRemoteAddress() + + " " + requestInfo); + addedRequestIds.add(entry.getKey()); + addedRequestInfos.add(new RequestInfo( + requestInfo.getDestinationAddress(), requestInfo.getRequest())); } + } - // Add any new requests to the system, connect if necessary, and re-send - for (int i = 0; i < addedRequestIds.size(); ++i) { - ClientRequestId requestId = addedRequestIds.get(i); - RequestInfo requestInfo = addedRequestInfos.get(i); + // Add any new requests to the system, connect if necessary, and re-send + for (int i = 0; i < addedRequestIds.size(); ++i) { + ClientRequestId requestId = addedRequestIds.get(i); + RequestInfo requestInfo = addedRequestInfos.get(i); - if (clientRequestIdRequestInfoMap.put(requestId, requestInfo) == - null) { - LOG.warn("waitSomeRequests: Request " + requestId + - " completed prior to sending the next request"); - clientRequestIdRequestInfoMap.remove(requestId); - } - InetSocketAddress remoteServer = requestInfo.getDestinationAddress(); - Channel channel = getNextChannel(remoteServer); - if (LOG.isInfoEnabled()) { - LOG.info("waitSomeRequests: Re-issuing request " + requestInfo); - } - ChannelFuture writeFuture = channel.write(requestInfo.getRequest()); - requestInfo.setWriteFuture(writeFuture); + if (clientRequestIdRequestInfoMap.put(requestId, requestInfo) == + null) { + LOG.warn("checkRequestsForProblems: Request " + requestId + + " completed prior to sending the next request"); + clientRequestIdRequestInfoMap.remove(requestId); + } + InetSocketAddress remoteServer = requestInfo.getDestinationAddress(); + Channel channel = getNextChannel(remoteServer); + if (LOG.isInfoEnabled()) { + LOG.info("checkRequestsForProblems: Re-issuing request " + requestInfo); } - addedRequestIds.clear(); - addedRequestInfos.clear(); + ChannelFuture writeFuture = channel.write(requestInfo.getRequest()); + requestInfo.setWriteFuture(writeFuture); } + addedRequestIds.clear(); + addedRequestInfos.clear(); } /**
