Repository: giraph Updated Branches: refs/heads/trunk fd61fdad3 -> 1c7552b1a
GIRAPH-1058: Fix connection retry logic Summary: Currently when we fail to connect to a channel we retry immediately and that retry most often fails. Add a short wait between retries, and improve the check for whether the channel connected successfully. Test Plan: Ran multiple jobs which were often failing before the fix, with fix they worked Differential Revision: https://reviews.facebook.net/D57447 Project: http://git-wip-us.apache.org/repos/asf/giraph/repo Commit: http://git-wip-us.apache.org/repos/asf/giraph/commit/1c7552b1 Tree: http://git-wip-us.apache.org/repos/asf/giraph/tree/1c7552b1 Diff: http://git-wip-us.apache.org/repos/asf/giraph/diff/1c7552b1 Branch: refs/heads/trunk Commit: 1c7552b1a3c2bbde15f98671c7b7c1424494c128 Parents: fd61fda Author: Maja Kabiljo <[email protected]> Authored: Fri Apr 29 13:23:29 2016 -0700 Committer: Maja Kabiljo <[email protected]> Committed: Fri Apr 29 13:24:57 2016 -0700 ---------------------------------------------------------------------- .../apache/giraph/comm/netty/NettyClient.java | 20 +++++++++++++++++++- .../org/apache/giraph/conf/GiraphConstants.java | 5 +++++ src/site/xdoc/options.xml | 18 ++++++++++++++++++ 3 files changed, 42 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/giraph/blob/1c7552b1/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java index 863449a..c185fdc 100644 --- a/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java +++ b/giraph-core/src/main/java/org/apache/giraph/comm/netty/NettyClient.java @@ -91,6 +91,7 @@ import static org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_EXECUTION_AFTE import static org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_EXECUTION_THREADS; import static org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_USE_EXECUTION_HANDLER; import static org.apache.giraph.conf.GiraphConstants.NETTY_MAX_CONNECTION_FAILURES; +import static org.apache.giraph.conf.GiraphConstants.WAIT_TIME_BETWEEN_CONNECTION_RETRIES_MS; import static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS; /** @@ -166,6 +167,8 @@ public class NettyClient { private final float requestSizeWarningThreshold; /** Maximum number of connection failures */ private final int maxConnectionFailures; + /** How long to wait before trying to reconnect failed connections */ + private final long waitTimeBetweenConnectionRetriesMs; /** Maximum number of milliseconds for a request */ private final int maxRequestMilliseconds; /** Waiting interval for checking outstanding requests msecs */ @@ -239,6 +242,8 @@ public class NettyClient { maxRequestMilliseconds = MAX_REQUEST_MILLISECONDS.get(conf); maxConnectionFailures = NETTY_MAX_CONNECTION_FAILURES.get(conf); + waitTimeBetweenConnectionRetriesMs = + WAIT_TIME_BETWEEN_CONNECTION_RETRIES_MS.get(conf); waitingRequestMsecs = WAITING_REQUEST_MSECS.get(conf); maxPoolSize = GiraphConstants.NETTY_CLIENT_THREADS.get(conf); maxResolveAddressAttempts = MAX_RESOLVE_ADDRESS_ATTEMPTS.get(conf); @@ -462,11 +467,24 @@ public class NettyClient { int connected = 0; while (failures < maxConnectionFailures) { List<ChannelFutureAddress> nextCheckFutures = Lists.newArrayList(); + boolean isFirstFailure = true; for (ChannelFutureAddress waitingConnection : waitingConnectionList) { context.progress(); ChannelFuture future = waitingConnection.future; ProgressableUtils.awaitChannelFuture(future, context); - if (!future.isSuccess()) { + if (!future.isSuccess() || !future.channel().isOpen()) { + // Make a short pause before trying to reconnect failed addresses + // again, but to do it just once per iterating through channels + if (isFirstFailure) { + isFirstFailure = false; + try { + Thread.sleep(waitTimeBetweenConnectionRetriesMs); + } catch (InterruptedException e) { + throw new IllegalStateException( + "connectAllAddresses: InterruptedException occurred", e); + } + } + LOG.warn("connectAllAddresses: Future failed " + "to connect with " + waitingConnection.address + " with " + failures + " failures because of " + future.cause()); http://git-wip-us.apache.org/repos/asf/giraph/blob/1c7552b1/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java ---------------------------------------------------------------------- diff --git a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java index 8335e7e..15eca3c 100644 --- a/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java +++ b/giraph-core/src/main/java/org/apache/giraph/conf/GiraphConstants.java @@ -682,6 +682,11 @@ public interface GiraphConstants { new IntConfOption("giraph.nettyMaxConnectionFailures", 1000, "Netty max connection failures"); + /** How long to wait before trying to reconnect failed connections */ + IntConfOption WAIT_TIME_BETWEEN_CONNECTION_RETRIES_MS = + new IntConfOption("giraph.waitTimeBetweenConnectionRetriesMs", 500, + ""); + /** Initial port to start using for the IPC communication */ IntConfOption IPC_INITIAL_PORT = new IntConfOption("giraph.ipcInitialPort", 30000, http://git-wip-us.apache.org/repos/asf/giraph/blob/1c7552b1/src/site/xdoc/options.xml ---------------------------------------------------------------------- diff --git a/src/site/xdoc/options.xml b/src/site/xdoc/options.xml index 687d30f..2735575 100644 --- a/src/site/xdoc/options.xml +++ b/src/site/xdoc/options.xml @@ -148,6 +148,12 @@ under the License. <td>Enable the Metrics system</td> </tr> <tr> + <td>giraph.nettyAutoRead</td> + <td>boolean</td> + <td>true</td> + <td>Whether netty should pro-actively read requests and feed them to its processing pipeline</td> + </tr> + <tr> <td>giraph.nettyClientUseExecutionHandler</td> <td>boolean</td> <td>true</td> @@ -376,6 +382,12 @@ under the License. <td>Class which decides whether a failed job should be retried - optional</td> </tr> <tr> + <td>giraph.mapper.observers</td> + <td>class</td> + <td>null</td> + <td>Classes for Mapper Observer - optional</td> + </tr> + <tr> <td>giraph.mappingInputFormatClass</td> <td>class</td> <td>null</td> @@ -820,6 +832,12 @@ under the License. <td>Maximum timeout (in ms) for waiting for all all tasks to complete</td> </tr> <tr> + <td>giraph.waitTimeBetweenConnectionRetriesMs</td> + <td>integer</td> + <td>500</td> + <td></td> + </tr> + <tr> <td>giraph.waitingRequestMsecs</td> <td>integer</td> <td>15000</td>
