Hi,

I had similar issue, it was caused by long GC pauses. I patched NettyClient so when reconnect fails it sleeps for some time before next try. Patch is enclosed. Let me know if it works for you. I would try tuning GC. You can also try to use giraph.waitForRequestsConfirmation and giraph.maxNumberOfOpenRequests .
I hope I am right.

Regards
Lukas


On 4.4.2014 22:49, Suijian Zhou wrote:
Hi,
I have a zookeeper problem when running a giraph program, the program will be aborted in superstep 2 as: 14/04/04 15:44:48 INFO zookeeper.ClientCnxn: Opening socket connection to server compute-0-18.local/10.1.255.236:22181 <http://10.1.255.236:22181>. Will not attempt to authenticate using SASL (unknown error) 14/04/04 15:44:48 INFO zookeeper.ClientCnxn: Socket connection established to compute-0-18.local/10.1.255.236:22181 <http://10.1.255.236:22181>, initiating session 14/04/04 15:44:48 INFO zookeeper.ClientCnxn: Session establishment complete on server compute-0-18.local/10.1.255.236:22181 <http://10.1.255.236:22181>, sessionid = 0x1452e7c79910009, negotiated timeout = 600000
......
14/04/04 15:46:08 INFO job.JobProgressTracker: Data from 8 workers - Compute superstep 2: 0 out of 4847571 vertices computed; 0 out of 64 partitions computed; min free memory on worker 3 - 270.37MB, average 451.21MB 14/04/04 15:46:13 INFO job.JobProgressTracker: Data from 8 workers - Compute superstep 2: 0 out of 4847571 vertices computed; 0 out of 64 partitions computed; min free memory on worker 6 - 249.25MB, average 404.02MB 14/04/04 15:46:16 INFO zookeeper.ClientCnxn: Unable to read additional data from server sessionid 0x1452e7c79910009, likely server has closed socket, closing socket connection and attempting reconnect 14/04/04 15:46:17 INFO zookeeper.ClientCnxn: Opening socket connection to server compute-0-18.local/10.1.255.236:22181 <http://10.1.255.236:22181>. Will not attempt to authenticate using SASL (unknown error) 14/04/04 15:46:17 WARN zookeeper.ClientCnxn: Session 0x1452e7c79910009 for server null, unexpected error, closing socket connection and attempting reconnect
java.net.ConnectException: Connection refused


Each rerun of the program will lead to another computing node reporting the same error("Unable to read additional data from server sessionid...").

What in superstep 2 are:
  if (getSuperstep() == 2) {
    for (IntWritable message: messages) {
        for (Edge<IntWritable, IntWritable> edge: vertex.getEdges()) {
           sendMessage(edge.getTargetVertexId(), message);
           //int abc=0;
        }
    }
  }

Checked that if I replace the line "sendMessage(edge.getTargetVertexId(), message);" to another meaningless line like "int abc=0;", the program could be finished successfully. Seems a ZooKeeper problem but this seems comes with giraph as I did not install ZooKeeper seperately. I tried to modify parameters in GiraphConstants.java and re-compile giraph, but it seems do not take any effects as I see in the screen output the parameters were not changed at all. Any hints?

  Best Regards,
  Suijian


Index: src/main/java/org/apache/giraph/comm/netty/NettyClient.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
Subsystem: com.intellij.openapi.diff.impl.patch.BaseRevisionTextPatchEP
<+>/*\n * Licensed to the Apache Software Foundation (ASF) under one\n * or more contributor license agreements.  See the NOTICE file\n * distributed with this work for additional information\n * regarding copyright ownership.  The ASF licenses this file\n * to you under the Apache License, Version 2.0 (the\n * \"License\"); you may not use this file except in compliance\n * with the License.  You may obtain a copy of the License at\n *\n *     http://www.apache.org/licenses/LICENSE-2.0\n *\n * Unless required by applicable law or agreed to in writing, software\n * distributed under the License is distributed on an \"AS IS\" BASIS,\n * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n * See the License for the specific language governing permissions and\n * limitations under the License.\n */\n\npackage org.apache.giraph.comm.netty;\n\nimport org.apache.giraph.comm.netty.handler.AddressRequestIdGenerator;\nimport org.apache.giraph.comm.netty.handler.ClientRequestId;\nimport org.apache.giraph.comm.netty.handler.RequestEncoder;\nimport org.apache.giraph.comm.netty.handler.RequestInfo;\nimport org.apache.giraph.comm.netty.handler.RequestServerHandler;\nimport org.apache.giraph.comm.netty.handler.ResponseClientHandler;\n/*if_not[HADOOP_NON_SECURE]*/\nimport org.apache.giraph.comm.netty.handler.SaslClientHandler;\nimport org.apache.giraph.comm.requests.RequestType;\nimport org.apache.giraph.comm.requests.SaslTokenMessageRequest;\n/*end[HADOOP_NON_SECURE]*/\nimport org.apache.giraph.comm.requests.WritableRequest;\nimport org.apache.giraph.conf.GiraphConstants;\nimport org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;\nimport org.apache.giraph.graph.TaskInfo;\nimport org.apache.giraph.utils.PipelineUtils;\nimport org.apache.giraph.utils.ProgressableUtils;\nimport org.apache.giraph.utils.TimedLogger;\nimport org.apache.hadoop.mapreduce.Mapper;\nimport org.apache.log4j.Logger;\n\nimport com.google.common.collect.Lists;\nimport com.google.common.collect.MapMaker;\nimport com.google.common.collect.Maps;\nimport com.google.common.util.concurrent.ThreadFactoryBuilder;\n\nimport java.io.IOException;\nimport java.net.InetSocketAddress;\nimport java.util.Collection;\nimport java.util.Collections;\nimport java.util.Comparator;\nimport java.util.List;\nimport java.util.Map;\nimport java.util.concurrent.ConcurrentMap;\nimport java.util.concurrent.atomic.AtomicInteger;\nimport java.util.concurrent.atomic.AtomicLong;\n\nimport io.netty.bootstrap.Bootstrap;\nimport io.netty.channel.Channel;\nimport io.netty.channel.ChannelFuture;\nimport io.netty.channel.ChannelFutureListener;\nimport io.netty.channel.ChannelInitializer;\nimport io.netty.channel.ChannelOption;\nimport io.netty.channel.EventLoopGroup;\nimport io.netty.channel.nio.NioEventLoopGroup;\nimport io.netty.channel.socket.SocketChannel;\nimport io.netty.channel.socket.nio.NioSocketChannel;\nimport io.netty.handler.codec.FixedLengthFrameDecoder;\nimport io.netty.handler.codec.LengthFieldBasedFrameDecoder;\nimport io.netty.util.AttributeKey;\nimport io.netty.util.concurrent.DefaultEventExecutorGroup;\nimport io.netty.util.concurrent.EventExecutorGroup;\n\nimport static org.apache.giraph.conf.GiraphConstants.CLIENT_RECEIVE_BUFFER_SIZE;\nimport static org.apache.giraph.conf.GiraphConstants.CLIENT_SEND_BUFFER_SIZE;\nimport static org.apache.giraph.conf.GiraphConstants.MAX_REQUEST_MILLISECONDS;\nimport static org.apache.giraph.conf.GiraphConstants.MAX_RESOLVE_ADDRESS_ATTEMPTS;\nimport static org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_EXECUTION_AFTER_HANDLER;\nimport static org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_EXECUTION_THREADS;\nimport static org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_USE_EXECUTION_HANDLER;\nimport static org.apache.giraph.conf.GiraphConstants.NETTY_MAX_CONNECTION_FAILURES;\nimport static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS;\n\n/**\n * Netty client for sending requests.  Thread-safe.\n */\npublic class NettyClient {\n  /** Do we have a limit on number of open requests we can have */\n  public static final String LIMIT_NUMBER_OF_OPEN_REQUESTS =\n      \"giraph.waitForRequestsConfirmation\";\n  /** Default choice about having a limit on number of open requests */\n  public static final boolean LIMIT_NUMBER_OF_OPEN_REQUESTS_DEFAULT = false;\n  /** Maximum number of requests without confirmation we should have */\n  public static final String MAX_NUMBER_OF_OPEN_REQUESTS =\n      \"giraph.maxNumberOfOpenRequests\";\n  /** Default maximum number of requests without confirmation */\n  public static final int MAX_NUMBER_OF_OPEN_REQUESTS_DEFAULT = 10000;\n  /** Maximum number of requests to list (for debugging) */\n  public static final int MAX_REQUESTS_TO_LIST = 10;\n  /**\n   * Maximum number of destination task ids with open requests to list\n   * (for debugging)\n   */\n  public static final int MAX_DESTINATION_TASK_IDS_TO_LIST = 10;\n  /** 30 seconds to connect by default */\n  public static final int MAX_CONNECTION_MILLISECONDS_DEFAULT = 30 * 1000;\n/*if_not[HADOOP_NON_SECURE]*/\n  /** Used to authenticate with other workers acting as servers */\n  public static final AttributeKey<SaslNettyClient> SASL =\n      AttributeKey.valueOf(\"saslNettyClient\");\n/*end[HADOOP_NON_SECURE]*/\n  /** Class logger */\n  private static final Logger LOG = Logger.getLogger(NettyClient.class);\n  /** Context used to report progress */\n  private final Mapper<?, ?, ?, ?>.Context context;\n  /** Client bootstrap */\n  private final Bootstrap bootstrap;\n  /**\n   * Map of the peer connections, mapping from remote socket address to client\n   * meta data\n   */\n  private final ConcurrentMap<InetSocketAddress, ChannelRotater>\n  addressChannelMap = new MapMaker().makeMap();\n  /**\n   * Map from task id to address of its server\n   */\n  private final Map<Integer, InetSocketAddress> taskIdAddressMap =\n      new MapMaker().makeMap();\n  /**\n   * Request map of client request ids to request information.\n   */\n  private final ConcurrentMap<ClientRequestId, RequestInfo>\n  clientRequestIdRequestInfoMap;\n  /** Number of channels per server */\n  private final int channelsPerServer;\n  /** Inbound byte counter for this client */\n  private final InboundByteCounter inboundByteCounter = new\n      InboundByteCounter();\n  /** Outbound byte counter for this client */\n  private final OutboundByteCounter outboundByteCounter = new\n      OutboundByteCounter();\n  /** Send buffer size */\n  private final int sendBufferSize;\n  /** Receive buffer size */\n  private final int receiveBufferSize;\n  /** Do we have a limit on number of open requests */\n  private final boolean limitNumberOfOpenRequests;\n  /** Maximum number of requests without confirmation we can have */\n  private final int maxNumberOfOpenRequests;\n  /** Maximum number of connection failures */\n  private final int maxConnectionFailures;\n  /** Maximum number of milliseconds for a request */\n  private final int maxRequestMilliseconds;\n  /** Waiting internal for checking outstanding requests msecs */\n  private final int waitingRequestMsecs;\n  /** Timed logger for printing request debugging */\n  private final TimedLogger requestLogger = new TimedLogger(15 * 1000, LOG);\n  /** Worker executor group */\n  private final EventLoopGroup workerGroup;\n  /** Address request id generator */\n  private final AddressRequestIdGenerator addressRequestIdGenerator =\n      new AddressRequestIdGenerator();\n  /** Task info */\n  private final TaskInfo myTaskInfo;\n  /** Maximum thread pool size */\n  private final int maxPoolSize;\n  /** Maximum number of attempts to resolve an address*/\n  private final int maxResolveAddressAttempts;\n  /** Use execution handler? */\n  private final boolean useExecutionGroup;\n  /** EventExecutor Group (if used) */\n  private final EventExecutorGroup executionGroup;\n  /** Name of the handler to use execution group for (if used) */\n  private final String handlerToUseExecutionGroup;\n  /** When was the last time we checked if we should resend some requests */\n  private final AtomicLong lastTimeCheckedRequestsForProblems =\n      new AtomicLong(0);\n\n  /**\n   * Only constructor\n   *\n   * @param context Context for progress\n   * @param conf Configuration\n   * @param myTaskInfo Current task info\n   */\n  public NettyClient(Mapper<?, ?, ?, ?>.Context context,\n                     final ImmutableClassesGiraphConfiguration conf,\n                     TaskInfo myTaskInfo) {\n    this.context = context;\n    this.myTaskInfo = myTaskInfo;\n    this.channelsPerServer = GiraphConstants.CHANNELS_PER_SERVER.get(conf);\n    sendBufferSize = CLIENT_SEND_BUFFER_SIZE.get(conf);\n    receiveBufferSize = CLIENT_RECEIVE_BUFFER_SIZE.get(conf);\n\n    limitNumberOfOpenRequests = conf.getBoolean(\n        LIMIT_NUMBER_OF_OPEN_REQUESTS,\n        LIMIT_NUMBER_OF_OPEN_REQUESTS_DEFAULT);\n    if (limitNumberOfOpenRequests) {\n      maxNumberOfOpenRequests = conf.getInt(\n          MAX_NUMBER_OF_OPEN_REQUESTS,\n          MAX_NUMBER_OF_OPEN_REQUESTS_DEFAULT);\n      if (LOG.isInfoEnabled()) {\n        LOG.info(\"NettyClient: Limit number of open requests to \" +\n            maxNumberOfOpenRequests);\n      }\n    } else {\n      maxNumberOfOpenRequests = -1;\n    }\n\n    maxRequestMilliseconds = MAX_REQUEST_MILLISECONDS.get(conf);\n\n    maxConnectionFailures = NETTY_MAX_CONNECTION_FAILURES.get(conf);\n\n    waitingRequestMsecs = WAITING_REQUEST_MSECS.get(conf);\n\n    maxPoolSize = GiraphConstants.NETTY_CLIENT_THREADS.get(conf);\n\n    maxResolveAddressAttempts = MAX_RESOLVE_ADDRESS_ATTEMPTS.get(conf);\n\n    clientRequestIdRequestInfoMap =\n        new MapMaker().concurrencyLevel(maxPoolSize).makeMap();\n\n    handlerToUseExecutionGroup =\n        NETTY_CLIENT_EXECUTION_AFTER_HANDLER.get(conf);\n    useExecutionGroup = NETTY_CLIENT_USE_EXECUTION_HANDLER.get(conf);\n    if (useExecutionGroup) {\n      int executionThreads = NETTY_CLIENT_EXECUTION_THREADS.get(conf);\n      executionGroup = new DefaultEventExecutorGroup(executionThreads,\n          new ThreadFactoryBuilder().setNameFormat(\"netty-client-exec-%d\")\n              .build());\n      if (LOG.isInfoEnabled()) {\n        LOG.info(\"NettyClient: Using execution handler with \" +\n            executionThreads + \" threads after \" +\n            handlerToUseExecutionGroup + \".\");\n      }\n    } else {\n      executionGroup = null;\n    }\n\n    workerGroup = new NioEventLoopGroup(maxPoolSize,\n        new ThreadFactoryBuilder().setNameFormat(\n            \"netty-client-worker-%d\").build());\n\n    bootstrap = new Bootstrap();\n    bootstrap.group(workerGroup)\n        .channel(NioSocketChannel.class)\n        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,\n            MAX_CONNECTION_MILLISECONDS_DEFAULT)\n        .option(ChannelOption.TCP_NODELAY, true)\n        .option(ChannelOption.SO_KEEPALIVE, true)\n        .option(ChannelOption.SO_SNDBUF, sendBufferSize)\n        .option(ChannelOption.SO_RCVBUF, receiveBufferSize)\n        .option(ChannelOption.ALLOCATOR, conf.getNettyAllocator())\n        .handler(new ChannelInitializer<SocketChannel>() {\n          @Override\n          protected void initChannel(SocketChannel ch) throws Exception {\n      /*if_not[HADOOP_NON_SECURE]*/\n            if (conf.authenticate()) {\n              LOG.info(\"Using Netty with authentication.\");\n\n              // Our pipeline starts with just byteCounter, and then we use\n              // addLast() to incrementally add pipeline elements, so that we\n              // can name them for identification for removal or replacement\n              // after client is authenticated by server.\n              // After authentication is complete, the pipeline's SASL-specific\n              // functionality is removed, restoring the pipeline to exactly the\n              // same configuration as it would be without authentication.\n              PipelineUtils.addLastWithExecutorCheck(\"clientInboundByteCounter\",\n                  inboundByteCounter, handlerToUseExecutionGroup,\n                  executionGroup, ch);\n              PipelineUtils.addLastWithExecutorCheck(\n                  \"clientOutboundByteCounter\",\n                  outboundByteCounter, handlerToUseExecutionGroup,\n                  executionGroup, ch);\n\n              // The following pipeline component is needed to decode the\n              // server's SASL tokens. It is replaced with a\n              // FixedLengthFrameDecoder (same as used with the\n              // non-authenticated pipeline) after authentication\n              // completes (as in non-auth pipeline below).\n              PipelineUtils.addLastWithExecutorCheck(\n                  \"length-field-based-frame-decoder\",\n                  new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4),\n                  handlerToUseExecutionGroup, executionGroup, ch);\n              PipelineUtils.addLastWithExecutorCheck(\"request-encoder\",\n                  new RequestEncoder(conf), handlerToUseExecutionGroup,\n                  executionGroup, ch);\n              // The following pipeline component responds to the server's SASL\n              // tokens with its own responses. Both client and server share the\n              // same Hadoop Job token, which is used to create the SASL\n              // tokens to authenticate with each other.\n              // After authentication finishes, this pipeline component\n              // is removed.\n              PipelineUtils.addLastWithExecutorCheck(\"sasl-client-handler\",\n                  new SaslClientHandler(conf), handlerToUseExecutionGroup,\n                  executionGroup, ch);\n              PipelineUtils.addLastWithExecutorCheck(\"response-handler\",\n                  new ResponseClientHandler(clientRequestIdRequestInfoMap,\n                      conf), handlerToUseExecutionGroup, executionGroup, ch);\n            } else {\n              LOG.info(\"Using Netty without authentication.\");\n/*end[HADOOP_NON_SECURE]*/\n              PipelineUtils.addLastWithExecutorCheck(\"clientInboundByteCounter\",\n                  inboundByteCounter, handlerToUseExecutionGroup,\n                  executionGroup, ch);\n              PipelineUtils.addLastWithExecutorCheck(\n                  \"clientOutboundByteCounter\",\n                  outboundByteCounter, handlerToUseExecutionGroup,\n                  executionGroup, ch);\n              PipelineUtils.addLastWithExecutorCheck(\n                  \"fixed-length-frame-decoder\",\n                  new FixedLengthFrameDecoder(\n                      RequestServerHandler.RESPONSE_BYTES),\n                 handlerToUseExecutionGroup, executionGroup, ch);\n              PipelineUtils.addLastWithExecutorCheck(\"request-encoder\",\n                    new RequestEncoder(conf), handlerToUseExecutionGroup,\n                  executionGroup, ch);\n              PipelineUtils.addLastWithExecutorCheck(\"response-handler\",\n                    new ResponseClientHandler(clientRequestIdRequestInfoMap,\n                        conf), handlerToUseExecutionGroup, executionGroup, ch);\n\n/*if_not[HADOOP_NON_SECURE]*/\n            }\n/*end[HADOOP_NON_SECURE]*/\n          }\n        });\n  }\n\n  /**\n   * Pair object for connectAllAddresses().\n   */\n  private static class ChannelFutureAddress {\n    /** Future object */\n    private final ChannelFuture future;\n    /** Address of the future */\n    private final InetSocketAddress address;\n    /** Task id */\n    private final Integer taskId;\n\n    /**\n     * Constructor.\n     *\n     * @param future Immutable future\n     * @param address Immutable address\n     * @param taskId Immutable taskId\n     */\n    ChannelFutureAddress(\n        ChannelFuture future, InetSocketAddress address, Integer taskId) {\n      this.future = future;\n      this.address = address;\n      this.taskId = taskId;\n    }\n\n    @Override\n    public String toString() {\n      return \"(future=\" + future + \",address=\" + address + \",taskId=\" +\n          taskId + \")\";\n    }\n  }\n\n  /**\n   * Connect to a collection of tasks servers\n   *\n   * @param tasks Tasks to connect to (if haven't already connected)\n   */\n  public void connectAllAddresses(Collection<? extends TaskInfo> tasks) {\n    List<ChannelFutureAddress> waitingConnectionList =\n        Lists.newArrayListWithCapacity(tasks.size() * channelsPerServer);\n    for (TaskInfo taskInfo : tasks) {\n      context.progress();\n      InetSocketAddress address = taskIdAddressMap.get(taskInfo.getTaskId());\n      if (address == null ||\n          !address.getHostName().equals(taskInfo.getHostname()) ||\n          address.getPort() != taskInfo.getPort()) {\n        address = resolveAddress(maxResolveAddressAttempts,\n            taskInfo.getInetSocketAddress());\n        taskIdAddressMap.put(taskInfo.getTaskId(), address);\n      }\n      if (address == null || address.getHostName() == null ||\n          address.getHostName().isEmpty()) {\n        throw new IllegalStateException(\"connectAllAddresses: Null address \" +\n            \"in addresses \" + tasks);\n      }\n      if (address.isUnresolved()) {\n        throw new IllegalStateException(\"connectAllAddresses: Unresolved \" +\n            \"address \" + address);\n      }\n\n      if (addressChannelMap.containsKey(address)) {\n        continue;\n      }\n\n      // Start connecting to the remote server up to n time\n      for (int i = 0; i < channelsPerServer; ++i) {\n        ChannelFuture connectionFuture = bootstrap.connect(address);\n\n        waitingConnectionList.add(\n            new ChannelFutureAddress(\n                connectionFuture, address, taskInfo.getTaskId()));\n      }\n    }\n\n    // Wait for all the connections to succeed up to n tries\n    int failures = 0;\n    int connected = 0;\n    while (failures < maxConnectionFailures) {\n      List<ChannelFutureAddress> nextCheckFutures = Lists.newArrayList();\n      for (ChannelFutureAddress waitingConnection : waitingConnectionList) {\n        context.progress();\n        ChannelFuture future = waitingConnection.future;\n        ProgressableUtils.awaitChannelFuture(future, context);\n        if (!future.isSuccess()) {\n          LOG.warn(\"connectAllAddresses: Future failed \" +\n              \"to connect with \" + waitingConnection.address + \" with \" +\n              failures + \" failures because of \" + future.cause());\n\n          ChannelFuture connectionFuture =\n              bootstrap.connect(waitingConnection.address);\n          nextCheckFutures.add(new ChannelFutureAddress(connectionFuture,\n              waitingConnection.address, waitingConnection.taskId));\n          ++failures;\n        } else {\n          Channel channel = future.channel();\n          if (LOG.isDebugEnabled()) {\n            LOG.debug(\"connectAllAddresses: Connected to \" +\n                channel.remoteAddress() + \", open = \" + channel.isOpen());\n          }\n\n          if (channel.remoteAddress() == null) {\n            throw new IllegalStateException(\n                \"connectAllAddresses: Null remote address!\");\n          }\n\n          ChannelRotater rotater =\n              addressChannelMap.get(waitingConnection.address);\n          if (rotater == null) {\n            ChannelRotater newRotater =\n                new ChannelRotater(waitingConnection.taskId);\n            rotater = addressChannelMap.putIfAbsent(\n                waitingConnection.address, newRotater);\n            if (rotater == null) {\n              rotater = newRotater;\n            }\n          }\n          rotater.addChannel(future.channel());\n          ++connected;\n        }\n      }\n      LOG.info(\"connectAllAddresses: Successfully added \" +\n          (waitingConnectionList.size() - nextCheckFutures.size()) +\n          \" connections, (\" + connected + \" total connected) \" +\n          nextCheckFutures.size() + \" failed, \" +\n          failures + \" failures total.\");\n      if (nextCheckFutures.isEmpty()) {\n        break;\n      }\n      waitingConnectionList = nextCheckFutures;\n    }\n    if (failures >= maxConnectionFailures) {\n      throw new IllegalStateException(\n          \"connectAllAddresses: Too many failures (\" + failures + \").\");\n    }\n  }\n\n/*if_not[HADOOP_NON_SECURE]*/\n  /**\n   * Authenticate all servers in addressChannelMap.\n   */\n  public void authenticate() {\n    LOG.info(\"authenticate: NettyClient starting authentication with \" +\n        \"servers.\");\n    for (InetSocketAddress address: addressChannelMap.keySet()) {\n      if (LOG.isDebugEnabled()) {\n        LOG.debug(\"authenticate: Authenticating with address:\" + address);\n      }\n      ChannelRotater channelRotater = addressChannelMap.get(address);\n      for (Channel channel: channelRotater.getChannels()) {\n        if (LOG.isDebugEnabled()) {\n          LOG.debug(\"authenticate: Authenticating with server on channel: \" +\n              channel);\n        }\n        authenticateOnChannel(channelRotater.getTaskId(), channel);\n      }\n    }\n    if (LOG.isInfoEnabled()) {\n      LOG.info(\"authenticate: NettyClient successfully authenticated with \" +\n          addressChannelMap.size() + \" server\" +\n          ((addressChannelMap.size() != 1) ? \"s\" : \"\") +\n          \" - continuing with normal work.\");\n    }\n  }\n\n  /**\n   * Authenticate with server connected at given channel.\n   *\n   * @param taskId Task id of the channel\n   * @param channel Connection to server to authenticate with.\n   */\n  private void authenticateOnChannel(Integer taskId, Channel channel) {\n    try {\n      SaslNettyClient saslNettyClient = channel.attr(SASL).get();\n      if (channel.attr(SASL).get() == null) {\n        if (LOG.isDebugEnabled()) {\n          LOG.debug(\"authenticateOnChannel: Creating saslNettyClient now \" +\n              \"for channel: \" + channel);\n        }\n        saslNettyClient = new SaslNettyClient();\n        channel.attr(SASL).set(saslNettyClient);\n      }\n      if (!saslNettyClient.isComplete()) {\n        if (LOG.isDebugEnabled()) {\n          LOG.debug(\"authenticateOnChannel: Waiting for authentication \" +\n              \"to complete..\");\n        }\n        SaslTokenMessageRequest saslTokenMessage = saslNettyClient.firstToken();\n        sendWritableRequest(taskId, saslTokenMessage);\n        // We now wait for Netty's thread pool to communicate over this\n        // channel to authenticate with another worker acting as a server.\n        try {\n          synchronized (saslNettyClient.getAuthenticated()) {\n            while (!saslNettyClient.isComplete()) {\n              saslNettyClient.getAuthenticated().wait();\n            }\n          }\n        } catch (InterruptedException e) {\n          LOG.error(\"authenticateOnChannel: Interrupted while waiting for \" +\n              \"authentication.\");\n        }\n      }\n      if (LOG.isDebugEnabled()) {\n        LOG.debug(\"authenticateOnChannel: Authentication on channel: \" +\n            channel + \" has completed successfully.\");\n      }\n    } catch (IOException e) {\n      LOG.error(\"authenticateOnChannel: Failed to authenticate with server \" +\n          \"due to error: \" + e);\n    }\n    return;\n  }\n/*end[HADOOP_NON_SECURE]*/\n\n  /**\n   * Stop the client.\n   */\n  public void stop() {\n    if (LOG.isInfoEnabled()) {\n      LOG.info(\"stop: Halting netty client\");\n    }\n    // Close connections asynchronously, in a Netty-approved\n    // way, without cleaning up thread pools until all channels\n    // in addressChannelMap are closed (success or failure)\n    int channelCount = 0;\n    for (ChannelRotater channelRotater : addressChannelMap.values()) {\n      channelCount += channelRotater.size();\n    }\n    final int done = channelCount;\n    final AtomicInteger count = new AtomicInteger(0);\n    for (ChannelRotater channelRotater : addressChannelMap.values()) {\n      channelRotater.closeChannels(new ChannelFutureListener() {\n        @Override\n        public void operationComplete(ChannelFuture cf) {\n          context.progress();\n          if (count.incrementAndGet() == done) {\n            if (LOG.isInfoEnabled()) {\n              LOG.info(\"stop: reached wait threshold, \" +\n                  done + \" connections closed, releasing \" +\n                  \"resources now.\");\n            }\n            workerGroup.shutdownGracefully();\n            if (executionGroup != null) {\n              executionGroup.shutdownGracefully();\n            }\n          }\n        }\n      });\n    }\n    ProgressableUtils.awaitTerminationFuture(workerGroup, context);\n    if (executionGroup != null) {\n      ProgressableUtils.awaitTerminationFuture(executionGroup, context);\n    }\n    if (LOG.isInfoEnabled()) {\n      LOG.info(\"stop: Netty client halted\");\n    }\n  }\n\n  /**\n   * Get the next available channel, reconnecting if necessary\n   *\n   * @param remoteServer Remote server to get a channel for\n   * @return Available channel for this remote server\n   */\n  private Channel getNextChannel(InetSocketAddress remoteServer) {\n    Channel channel = addressChannelMap.get(remoteServer).nextChannel();\n    if (channel == null) {\n      throw new IllegalStateException(\n          \"getNextChannel: No channel exists for \" + remoteServer);\n    }\n\n    // Return this channel if it is connected\n    if (channel.isActive()) {\n      return channel;\n    }\n\n    // Get rid of the failed channel\n    if (addressChannelMap.get(remoteServer).removeChannel(channel)) {\n      LOG.warn(\"getNextChannel: Unlikely event that the channel \" +\n          channel + \" was already removed!\");\n    }\n    if (LOG.isInfoEnabled()) {\n      LOG.info(\"getNextChannel: Fixing disconnected channel to \" +\n          remoteServer + \", open = \" + channel.isOpen() + \", \" +\n          \"bound = \" + channel.isRegistered());\n    }\n    int reconnectFailures = 0;\n    while (reconnectFailures < maxConnectionFailures) {\n      ChannelFuture connectionFuture = bootstrap.connect(remoteServer);\n      ProgressableUtils.awaitChannelFuture(connectionFuture, context);\n      if (connectionFuture.isSuccess()) {\n        if (LOG.isInfoEnabled()) {\n          LOG.info(\"getNextChannel: Connected to \" + remoteServer + \"!\");\n        }\n        addressChannelMap.get(remoteServer).addChannel(\n            connectionFuture.channel());\n        return connectionFuture.channel();\n      }\n      ++reconnectFailures;\n      LOG.warn(\"getNextChannel: Failed to reconnect to \" +  remoteServer +\n          \" on attempt \" + reconnectFailures + \" out of \" +\n          maxConnectionFailures + \" max attempts, sleeping for 5 secs\",\n          connectionFuture.cause());\n      try {\n        Thread.sleep(5000);\n      } catch (InterruptedException e) {\n        LOG.warn(\"getNextChannel: Unexpected interrupted exception\", e);\n      }\n    }\n    throw new IllegalStateException(\"getNextChannel: Failed to connect \" +\n        \"to \" + remoteServer + \" in \" + reconnectFailures +\n        \" connect attempts\");\n  }\n\n  /**\n   * Send a request to a remote server (should be already connected)\n   *\n   * @param destTaskId Destination task id\n   * @param request Request to send\n   */\n  public void sendWritableRequest(Integer destTaskId,\n      WritableRequest request) {\n    InetSocketAddress remoteServer = taskIdAddressMap.get(destTaskId);\n    if (clientRequestIdRequestInfoMap.isEmpty()) {\n      inboundByteCounter.resetAll();\n      outboundByteCounter.resetAll();\n    }\n    boolean registerRequest = true;\n/*if_not[HADOOP_NON_SECURE]*/\n    if (request.getType() == RequestType.SASL_TOKEN_MESSAGE_REQUEST) {\n      registerRequest = false;\n    }\n/*end[HADOOP_NON_SECURE]*/\n\n    Channel channel = getNextChannel(remoteServer);\n    RequestInfo newRequestInfo = new RequestInfo(remoteServer, request);\n    if (registerRequest) {\n      request.setClientId(myTaskInfo.getTaskId());\n      request.setRequestId(\n        addressRequestIdGenerator.getNextRequestId(remoteServer));\n      ClientRequestId clientRequestId =\n        new ClientRequestId(destTaskId, request.getRequestId());\n      RequestInfo oldRequestInfo = clientRequestIdRequestInfoMap.putIfAbsent(\n        clientRequestId, newRequestInfo);\n      if (oldRequestInfo != null) {\n        throw new IllegalStateException(\"sendWritableRequest: Impossible to \" +\n          \"have a previous request id = \" + request.getRequestId() + \", \" +\n          \"request info of \" + oldRequestInfo);\n      }\n    }\n    ChannelFuture writeFuture = channel.write(request);\n    newRequestInfo.setWriteFuture(writeFuture);\n\n    if (limitNumberOfOpenRequests &&\n        clientRequestIdRequestInfoMap.size() > maxNumberOfOpenRequests) {\n      waitSomeRequests(maxNumberOfOpenRequests);\n    }\n  }\n\n  /**\n   * Ensure all the request sent so far are complete.\n   *\n   * @throws InterruptedException\n   */\n  public void waitAllRequests() {\n    waitSomeRequests(0);\n    if (LOG.isInfoEnabled()) {\n      LOG.info(\"waitAllRequests: Finished all requests. \" +\n          inboundByteCounter.getMetrics() + \"\\n\" + outboundByteCounter\n          .getMetrics());\n    }\n  }\n\n  /**\n   * Ensure that at most maxOpenRequests are not complete.  Periodically,\n   * check the state of every request.  If we find the connection failed,\n   * re-establish it and re-send the request.\n   *\n   * @param maxOpenRequests Maximum number of requests which can be not\n   *                        complete\n   */\n  private void waitSomeRequests(int maxOpenRequests) {\n    while (clientRequestIdRequestInfoMap.size() > maxOpenRequests) {\n      // Wait for requests to complete for some time\n      logInfoAboutOpenRequests(maxOpenRequests);\n      synchronized (clientRequestIdRequestInfoMap) {\n        if (clientRequestIdRequestInfoMap.size() <= maxOpenRequests) {\n          break;\n        }\n        try {\n          clientRequestIdRequestInfoMap.wait(waitingRequestMsecs);\n        } catch (InterruptedException e) {\n          LOG.error(\"waitSomeRequests: Got unexpected InterruptedException\", e);\n        }\n      }\n      // Make sure that waiting doesn't kill the job\n      context.progress();\n\n      checkRequestsForProblems();\n    }\n  }\n\n  /**\n   * Log the status of open requests.\n   *\n   * @param maxOpenRequests Maximum number of requests which can be not complete\n   */\n  private void logInfoAboutOpenRequests(int maxOpenRequests) {\n    if (LOG.isInfoEnabled() && requestLogger.isPrintable()) {\n      LOG.info(\"logInfoAboutOpenRequests: Waiting interval of \" +\n          waitingRequestMsecs + \" msecs, \" +\n          clientRequestIdRequestInfoMap.size() +\n          \" open requests, waiting for it to be <= \" + maxOpenRequests +\n          \", \" + inboundByteCounter.getMetrics() + \"\\n\" +\n          outboundByteCounter.getMetrics());\n\n      if (clientRequestIdRequestInfoMap.size() < MAX_REQUESTS_TO_LIST) {\n        for (Map.Entry<ClientRequestId, RequestInfo> entry :\n            clientRequestIdRequestInfoMap.entrySet()) {\n          LOG.info(\"logInfoAboutOpenRequests: Waiting for request \" +\n              entry.getKey() + \" - \" + entry.getValue());\n        }\n      }\n\n      // Count how many open requests each task has\n      Map<Integer, Integer> openRequestCounts = Maps.newHashMap();\n      for (ClientRequestId clientRequestId :\n          clientRequestIdRequestInfoMap.keySet()) {\n        int taskId = clientRequestId.getDestinationTaskId();\n        Integer currentCount = openRequestCounts.get(taskId);\n        openRequestCounts.put(taskId,\n            (currentCount == null ? 0 : currentCount) + 1);\n      }\n      // Sort it in decreasing order of number of open requests\n      List<Map.Entry<Integer, Integer>> sorted =\n          Lists.newArrayList(openRequestCounts.entrySet());\n      Collections.sort(sorted, new Comparator<Map.Entry<Integer, Integer>>() {\n        @Override\n        public int compare(Map.Entry<Integer, Integer> entry1,\n            Map.Entry<Integer, Integer> entry2) {\n          int value1 = entry1.getValue();\n          int value2 = entry2.getValue();\n          return (value1 < value2) ? 1 : ((value1 == value2) ? 0 : -1);\n        }\n      });\n      // Print task ids which have the most open requests\n      StringBuilder message = new StringBuilder();\n      message.append(\"logInfoAboutOpenRequests: \");\n      int itemsToPrint =\n          Math.min(MAX_DESTINATION_TASK_IDS_TO_LIST, sorted.size());\n      for (int i = 0; i < itemsToPrint; i++) {\n        message.append(sorted.get(i).getValue())\n            .append(\" requests for taskId=\")\n            .append(sorted.get(i).getKey())\n            .append(\", \");\n      }\n      LOG.info(message);\n    }\n  }\n\n  /**\n   * Check if there are some open requests which have been sent a long time\n   * ago, and if so resend them.\n   */\n  private void checkRequestsForProblems() {\n    long lastTimeChecked = lastTimeCheckedRequestsForProblems.get();\n    // If not enough time passed from the previous check, return\n    if (System.currentTimeMillis() < lastTimeChecked + waitingRequestMsecs) {\n      return;\n    }\n    // If another thread did the check already, return\n    if (!lastTimeCheckedRequestsForProblems.compareAndSet(lastTimeChecked,\n        System.currentTimeMillis())) {\n      return;\n    }\n    List<ClientRequestId> addedRequestIds = Lists.newArrayList();\n    List<RequestInfo> addedRequestInfos = Lists.newArrayList();\n    // Check all the requests for problems\n    for (Map.Entry<ClientRequestId, RequestInfo> entry :\n        clientRequestIdRequestInfoMap.entrySet()) {\n      RequestInfo requestInfo = entry.getValue();\n      ChannelFuture writeFuture = requestInfo.getWriteFuture();\n      // Request wasn't sent yet\n      if (writeFuture == null) {\n        continue;\n      }\n      // If not connected anymore, request failed, or the request is taking\n      // too long, re-establish and resend\n      if (!writeFuture.channel().isActive() ||\n          (writeFuture.isDone() && !writeFuture.isSuccess()) ||\n          (requestInfo.getElapsedMsecs() > maxRequestMilliseconds)) {\n        LOG.warn(\"checkRequestsForProblems: Problem with request id \" +\n            entry.getKey() + \" connected = \" +\n            writeFuture.channel().isActive() +\n            \", future done = \" + writeFuture.isDone() + \", \" +\n            \"success = \" + writeFuture.isSuccess() + \", \" +\n            \"cause = \" + writeFuture.cause() + \", \" +\n            \"elapsed time = \" + requestInfo.getElapsedMsecs() + \", \" +\n            \"destination = \" + writeFuture.channel().remoteAddress() +\n            \" \" + requestInfo);\n        addedRequestIds.add(entry.getKey());\n        addedRequestInfos.add(new RequestInfo(\n            requestInfo.getDestinationAddress(), requestInfo.getRequest()));\n      }\n    }\n\n    // Add any new requests to the system, connect if necessary, and re-send\n    for (int i = 0; i < addedRequestIds.size(); ++i) {\n      ClientRequestId requestId = addedRequestIds.get(i);\n      RequestInfo requestInfo = addedRequestInfos.get(i);\n\n      if (clientRequestIdRequestInfoMap.put(requestId, requestInfo) ==\n          null) {\n        LOG.warn(\"checkRequestsForProblems: Request \" + requestId +\n            \" completed prior to sending the next request\");\n        clientRequestIdRequestInfoMap.remove(requestId);\n      }\n      InetSocketAddress remoteServer = requestInfo.getDestinationAddress();\n      Channel channel = getNextChannel(remoteServer);\n      if (LOG.isInfoEnabled()) {\n        LOG.info(\"checkRequestsForProblems: Re-issuing request \" + requestInfo);\n      }\n      ChannelFuture writeFuture = channel.write(requestInfo.getRequest());\n      requestInfo.setWriteFuture(writeFuture);\n    }\n    addedRequestIds.clear();\n    addedRequestInfos.clear();\n  }\n\n  /**\n   * Utility method for resolving addresses\n   *\n   * @param maxResolveAddressAttempts Maximum number of attempts to resolve the\n   *        address\n   * @param address The address we are attempting to resolve\n   * @return The successfully resolved address.\n   * @throws IllegalStateException if the address is not resolved\n   *         in <code>maxResolveAddressAttempts</code> tries.\n   */\n  private static InetSocketAddress resolveAddress(\n      int maxResolveAddressAttempts, InetSocketAddress address) {\n    int resolveAttempts = 0;\n    while (address.isUnresolved() &&\n        resolveAttempts < maxResolveAddressAttempts) {\n      ++resolveAttempts;\n      LOG.warn(\"resolveAddress: Failed to resolve \" + address +\n          \" on attempt \" + resolveAttempts + \" of \" +\n          maxResolveAddressAttempts + \" attempts, sleeping for 5 seconds\");\n      try {\n        Thread.sleep(5000);\n      } catch (InterruptedException e) {\n        LOG.warn(\"resolveAddress: Interrupted.\", e);\n      }\n      address = new InetSocketAddress(address.getHostName(),\n          address.getPort());\n    }\n    if (resolveAttempts >= maxResolveAddressAttempts) {\n      throw new IllegalStateException(\"resolveAddress: Couldn't \" +\n          \"resolve \" + address + \" in \" +  resolveAttempts + \" tries.\");\n    }\n    return address;\n  }\n}\n
===================================================================
--- src/main/java/org/apache/giraph/comm/netty/NettyClient.java	(revision 01d11687dd2e89f97e0b5c557bbfd0cefb6ed625)
+++ src/main/java/org/apache/giraph/comm/netty/NettyClient.java	(revision )
@@ -153,6 +153,10 @@
   private final int maxRequestMilliseconds;
   /** Waiting internal for checking outstanding requests msecs */
   private final int waitingRequestMsecs;
+
+  /** Fix - wait time when connection failed*/
+  private int sleepDelay = 100;
+
   /** Timed logger for printing request debugging */
   private final TimedLogger requestLogger = new TimedLogger(15 * 1000, LOG);
   /** Worker executor group */
@@ -403,6 +407,7 @@
     // Wait for all the connections to succeed up to n tries
     int failures = 0;
     int connected = 0;
+    sleepDelay = 100;
     while (failures < maxConnectionFailures) {
       List<ChannelFutureAddress> nextCheckFutures = Lists.newArrayList();
       for (ChannelFutureAddress waitingConnection : waitingConnectionList) {
@@ -453,6 +458,19 @@
           failures + " failures total.");
       if (nextCheckFutures.isEmpty()) {
         break;
+      } else {
+        try {
+          LOG.info("FIX: Waiting " + sleepDelay +
+                  " ms for " + nextCheckFutures.size() + " connections");
+          Thread.sleep(sleepDelay);
+          context.getCounter(
+                  "FIX waiting",
+                  "Waiting for " + sleepDelay + "ms").increment(1);
+          int delay = (int) Math.round(sleepDelay * 1.2);
+          sleepDelay = Math.max(1000, delay);
+        } catch (InterruptedException e) {
+          LOG.error("Waiting failed:" + e);
+        }
       }
       waitingConnectionList = nextCheckFutures;
     }

Reply via email to