Hi,
I had similar issue, it was caused by long GC pauses. I patched
NettyClient so when reconnect fails it sleeps for some time before next
try. Patch is enclosed. Let me know if it works for you.
I would try tuning GC. You can also try to use
giraph.waitForRequestsConfirmation and giraph.maxNumberOfOpenRequests .
Regards
I hope I am right
Lukas
On 4.4.2014 22:49, Suijian Zhou wrote:
Hi,
I have a zookeeper problem when running a giraph program, the
program will be aborted in superstep 2 as:
14/04/04 15:44:48 INFO zookeeper.ClientCnxn: Opening socket connection
to server compute-0-18.local/10.1.255.236:22181
<http://10.1.255.236:22181>. Will not attempt to authenticate using
SASL (unknown error)
14/04/04 15:44:48 INFO zookeeper.ClientCnxn: Socket connection
established to compute-0-18.local/10.1.255.236:22181
<http://10.1.255.236:22181>, initiating session
14/04/04 15:44:48 INFO zookeeper.ClientCnxn: Session establishment
complete on server compute-0-18.local/10.1.255.236:22181
<http://10.1.255.236:22181>, sessionid = 0x1452e7c79910009, negotiated
timeout = 600000
......
14/04/04 15:46:08 INFO job.JobProgressTracker: Data from 8 workers -
Compute superstep 2: 0 out of 4847571 vertices computed; 0 out of 64
partitions computed; min free memory on worker 3 - 270.37MB, average
451.21MB
14/04/04 15:46:13 INFO job.JobProgressTracker: Data from 8 workers -
Compute superstep 2: 0 out of 4847571 vertices computed; 0 out of 64
partitions computed; min free memory on worker 6 - 249.25MB, average
404.02MB
14/04/04 15:46:16 INFO zookeeper.ClientCnxn: Unable to read additional
data from server sessionid 0x1452e7c79910009, likely server has closed
socket, closing socket connection and attempting reconnect
14/04/04 15:46:17 INFO zookeeper.ClientCnxn: Opening socket connection
to server compute-0-18.local/10.1.255.236:22181
<http://10.1.255.236:22181>. Will not attempt to authenticate using
SASL (unknown error)
14/04/04 15:46:17 WARN zookeeper.ClientCnxn: Session 0x1452e7c79910009
for server null, unexpected error, closing socket connection and
attempting reconnect
java.net.ConnectException: Connection refused
Each rerun of the program will lead to another computing node
reporting the same error("Unable to read additional data from server
sessionid...").
What in superstep 2 are:
if (getSuperstep() == 2) {
for (IntWritable message: messages) {
for (Edge<IntWritable, IntWritable> edge: vertex.getEdges()) {
sendMessage(edge.getTargetVertexId(), message);
//int abc=0;
}
}
}
Checked that if I replace the line
"sendMessage(edge.getTargetVertexId(), message);" to another
meaningless line like "int abc=0;", the program could be finished
successfully. Seems a ZooKeeper problem but this seems comes with
giraph as I did not install ZooKeeper seperately. I tried to modify
parameters in GiraphConstants.java and re-compile giraph, but it seems
do not take any effects as I see in the screen output the parameters
were not changed at all. Any hints?
Best Regards,
Suijian
Index: src/main/java/org/apache/giraph/comm/netty/NettyClient.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
Subsystem: com.intellij.openapi.diff.impl.patch.BaseRevisionTextPatchEP
<+>/*\n * Licensed to the Apache Software Foundation (ASF) under one\n * or more contributor license agreements. See the NOTICE file\n * distributed with this work for additional information\n * regarding copyright ownership. The ASF licenses this file\n * to you under the Apache License, Version 2.0 (the\n * \"License\"); you may not use this file except in compliance\n * with the License. You may obtain a copy of the License at\n *\n * http://www.apache.org/licenses/LICENSE-2.0\n *\n * Unless required by applicable law or agreed to in writing, software\n * distributed under the License is distributed on an \"AS IS\" BASIS,\n * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n * See the License for the specific language governing permissions and\n * limitations under the License.\n */\n\npackage org.apache.giraph.comm.netty;\n\nimport org.apache.giraph.comm.netty.handler.AddressRequestIdGenerator;\nimport org.apache.giraph.comm.netty.handler.ClientRequestId;\nimport org.apache.giraph.comm.netty.handler.RequestEncoder;\nimport org.apache.giraph.comm.netty.handler.RequestInfo;\nimport org.apache.giraph.comm.netty.handler.RequestServerHandler;\nimport org.apache.giraph.comm.netty.handler.ResponseClientHandler;\n/*if_not[HADOOP_NON_SECURE]*/\nimport org.apache.giraph.comm.netty.handler.SaslClientHandler;\nimport org.apache.giraph.comm.requests.RequestType;\nimport org.apache.giraph.comm.requests.SaslTokenMessageRequest;\n/*end[HADOOP_NON_SECURE]*/\nimport org.apache.giraph.comm.requests.WritableRequest;\nimport org.apache.giraph.conf.GiraphConstants;\nimport org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;\nimport org.apache.giraph.graph.TaskInfo;\nimport org.apache.giraph.utils.PipelineUtils;\nimport org.apache.giraph.utils.ProgressableUtils;\nimport org.apache.giraph.utils.TimedLogger;\nimport org.apache.hadoop.mapreduce.Mapper;\nimport org.apache.log4j.Logger;\n\nimport com.google.common.collect.Lists;\nimport com.google.common.collect.MapMaker;\nimport com.google.common.collect.Maps;\nimport com.google.common.util.concurrent.ThreadFactoryBuilder;\n\nimport java.io.IOException;\nimport java.net.InetSocketAddress;\nimport java.util.Collection;\nimport java.util.Collections;\nimport java.util.Comparator;\nimport java.util.List;\nimport java.util.Map;\nimport java.util.concurrent.ConcurrentMap;\nimport java.util.concurrent.atomic.AtomicInteger;\nimport java.util.concurrent.atomic.AtomicLong;\n\nimport io.netty.bootstrap.Bootstrap;\nimport io.netty.channel.Channel;\nimport io.netty.channel.ChannelFuture;\nimport io.netty.channel.ChannelFutureListener;\nimport io.netty.channel.ChannelInitializer;\nimport io.netty.channel.ChannelOption;\nimport io.netty.channel.EventLoopGroup;\nimport io.netty.channel.nio.NioEventLoopGroup;\nimport io.netty.channel.socket.SocketChannel;\nimport io.netty.channel.socket.nio.NioSocketChannel;\nimport io.netty.handler.codec.FixedLengthFrameDecoder;\nimport io.netty.handler.codec.LengthFieldBasedFrameDecoder;\nimport io.netty.util.AttributeKey;\nimport io.netty.util.concurrent.DefaultEventExecutorGroup;\nimport io.netty.util.concurrent.EventExecutorGroup;\n\nimport static org.apache.giraph.conf.GiraphConstants.CLIENT_RECEIVE_BUFFER_SIZE;\nimport static org.apache.giraph.conf.GiraphConstants.CLIENT_SEND_BUFFER_SIZE;\nimport static org.apache.giraph.conf.GiraphConstants.MAX_REQUEST_MILLISECONDS;\nimport static org.apache.giraph.conf.GiraphConstants.MAX_RESOLVE_ADDRESS_ATTEMPTS;\nimport static org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_EXECUTION_AFTER_HANDLER;\nimport static org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_EXECUTION_THREADS;\nimport static org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_USE_EXECUTION_HANDLER;\nimport static org.apache.giraph.conf.GiraphConstants.NETTY_MAX_CONNECTION_FAILURES;\nimport static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS;\n\n/**\n * Netty client for sending requests. Thread-safe.\n */\npublic class NettyClient {\n /** Do we have a limit on number of open requests we can have */\n public static final String LIMIT_NUMBER_OF_OPEN_REQUESTS =\n \"giraph.waitForRequestsConfirmation\";\n /** Default choice about having a limit on number of open requests */\n public static final boolean LIMIT_NUMBER_OF_OPEN_REQUESTS_DEFAULT = false;\n /** Maximum number of requests without confirmation we should have */\n public static final String MAX_NUMBER_OF_OPEN_REQUESTS =\n \"giraph.maxNumberOfOpenRequests\";\n /** Default maximum number of requests without confirmation */\n public static final int MAX_NUMBER_OF_OPEN_REQUESTS_DEFAULT = 10000;\n /** Maximum number of requests to list (for debugging) */\n public static final int MAX_REQUESTS_TO_LIST = 10;\n /**\n * Maximum number of destination task ids with open requests to list\n * (for debugging)\n */\n public static final int MAX_DESTINATION_TASK_IDS_TO_LIST = 10;\n /** 30 seconds to connect by default */\n public static final int MAX_CONNECTION_MILLISECONDS_DEFAULT = 30 * 1000;\n/*if_not[HADOOP_NON_SECURE]*/\n /** Used to authenticate with other workers acting as servers */\n public static final AttributeKey<SaslNettyClient> SASL =\n AttributeKey.valueOf(\"saslNettyClient\");\n/*end[HADOOP_NON_SECURE]*/\n /** Class logger */\n private static final Logger LOG = Logger.getLogger(NettyClient.class);\n /** Context used to report progress */\n private final Mapper<?, ?, ?, ?>.Context context;\n /** Client bootstrap */\n private final Bootstrap bootstrap;\n /**\n * Map of the peer connections, mapping from remote socket address to client\n * meta data\n */\n private final ConcurrentMap<InetSocketAddress, ChannelRotater>\n addressChannelMap = new MapMaker().makeMap();\n /**\n * Map from task id to address of its server\n */\n private final Map<Integer, InetSocketAddress> taskIdAddressMap =\n new MapMaker().makeMap();\n /**\n * Request map of client request ids to request information.\n */\n private final ConcurrentMap<ClientRequestId, RequestInfo>\n clientRequestIdRequestInfoMap;\n /** Number of channels per server */\n private final int channelsPerServer;\n /** Inbound byte counter for this client */\n private final InboundByteCounter inboundByteCounter = new\n InboundByteCounter();\n /** Outbound byte counter for this client */\n private final OutboundByteCounter outboundByteCounter = new\n OutboundByteCounter();\n /** Send buffer size */\n private final int sendBufferSize;\n /** Receive buffer size */\n private final int receiveBufferSize;\n /** Do we have a limit on number of open requests */\n private final boolean limitNumberOfOpenRequests;\n /** Maximum number of requests without confirmation we can have */\n private final int maxNumberOfOpenRequests;\n /** Maximum number of connection failures */\n private final int maxConnectionFailures;\n /** Maximum number of milliseconds for a request */\n private final int maxRequestMilliseconds;\n /** Waiting internal for checking outstanding requests msecs */\n private final int waitingRequestMsecs;\n /** Timed logger for printing request debugging */\n private final TimedLogger requestLogger = new TimedLogger(15 * 1000, LOG);\n /** Worker executor group */\n private final EventLoopGroup workerGroup;\n /** Address request id generator */\n private final AddressRequestIdGenerator addressRequestIdGenerator =\n new AddressRequestIdGenerator();\n /** Task info */\n private final TaskInfo myTaskInfo;\n /** Maximum thread pool size */\n private final int maxPoolSize;\n /** Maximum number of attempts to resolve an address*/\n private final int maxResolveAddressAttempts;\n /** Use execution handler? */\n private final boolean useExecutionGroup;\n /** EventExecutor Group (if used) */\n private final EventExecutorGroup executionGroup;\n /** Name of the handler to use execution group for (if used) */\n private final String handlerToUseExecutionGroup;\n /** When was the last time we checked if we should resend some requests */\n private final AtomicLong lastTimeCheckedRequestsForProblems =\n new AtomicLong(0);\n\n /**\n * Only constructor\n *\n * @param context Context for progress\n * @param conf Configuration\n * @param myTaskInfo Current task info\n */\n public NettyClient(Mapper<?, ?, ?, ?>.Context context,\n final ImmutableClassesGiraphConfiguration conf,\n TaskInfo myTaskInfo) {\n this.context = context;\n this.myTaskInfo = myTaskInfo;\n this.channelsPerServer = GiraphConstants.CHANNELS_PER_SERVER.get(conf);\n sendBufferSize = CLIENT_SEND_BUFFER_SIZE.get(conf);\n receiveBufferSize = CLIENT_RECEIVE_BUFFER_SIZE.get(conf);\n\n limitNumberOfOpenRequests = conf.getBoolean(\n LIMIT_NUMBER_OF_OPEN_REQUESTS,\n LIMIT_NUMBER_OF_OPEN_REQUESTS_DEFAULT);\n if (limitNumberOfOpenRequests) {\n maxNumberOfOpenRequests = conf.getInt(\n MAX_NUMBER_OF_OPEN_REQUESTS,\n MAX_NUMBER_OF_OPEN_REQUESTS_DEFAULT);\n if (LOG.isInfoEnabled()) {\n LOG.info(\"NettyClient: Limit number of open requests to \" +\n maxNumberOfOpenRequests);\n }\n } else {\n maxNumberOfOpenRequests = -1;\n }\n\n maxRequestMilliseconds = MAX_REQUEST_MILLISECONDS.get(conf);\n\n maxConnectionFailures = NETTY_MAX_CONNECTION_FAILURES.get(conf);\n\n waitingRequestMsecs = WAITING_REQUEST_MSECS.get(conf);\n\n maxPoolSize = GiraphConstants.NETTY_CLIENT_THREADS.get(conf);\n\n maxResolveAddressAttempts = MAX_RESOLVE_ADDRESS_ATTEMPTS.get(conf);\n\n clientRequestIdRequestInfoMap =\n new MapMaker().concurrencyLevel(maxPoolSize).makeMap();\n\n handlerToUseExecutionGroup =\n NETTY_CLIENT_EXECUTION_AFTER_HANDLER.get(conf);\n useExecutionGroup = NETTY_CLIENT_USE_EXECUTION_HANDLER.get(conf);\n if (useExecutionGroup) {\n int executionThreads = NETTY_CLIENT_EXECUTION_THREADS.get(conf);\n executionGroup = new DefaultEventExecutorGroup(executionThreads,\n new ThreadFactoryBuilder().setNameFormat(\"netty-client-exec-%d\")\n .build());\n if (LOG.isInfoEnabled()) {\n LOG.info(\"NettyClient: Using execution handler with \" +\n executionThreads + \" threads after \" +\n handlerToUseExecutionGroup + \".\");\n }\n } else {\n executionGroup = null;\n }\n\n workerGroup = new NioEventLoopGroup(maxPoolSize,\n new ThreadFactoryBuilder().setNameFormat(\n \"netty-client-worker-%d\").build());\n\n bootstrap = new Bootstrap();\n bootstrap.group(workerGroup)\n .channel(NioSocketChannel.class)\n .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,\n MAX_CONNECTION_MILLISECONDS_DEFAULT)\n .option(ChannelOption.TCP_NODELAY, true)\n .option(ChannelOption.SO_KEEPALIVE, true)\n .option(ChannelOption.SO_SNDBUF, sendBufferSize)\n .option(ChannelOption.SO_RCVBUF, receiveBufferSize)\n .option(ChannelOption.ALLOCATOR, conf.getNettyAllocator())\n .handler(new ChannelInitializer<SocketChannel>() {\n @Override\n protected void initChannel(SocketChannel ch) throws Exception {\n /*if_not[HADOOP_NON_SECURE]*/\n if (conf.authenticate()) {\n LOG.info(\"Using Netty with authentication.\");\n\n // Our pipeline starts with just byteCounter, and then we use\n // addLast() to incrementally add pipeline elements, so that we\n // can name them for identification for removal or replacement\n // after client is authenticated by server.\n // After authentication is complete, the pipeline's SASL-specific\n // functionality is removed, restoring the pipeline to exactly the\n // same configuration as it would be without authentication.\n PipelineUtils.addLastWithExecutorCheck(\"clientInboundByteCounter\",\n inboundByteCounter, handlerToUseExecutionGroup,\n executionGroup, ch);\n PipelineUtils.addLastWithExecutorCheck(\n \"clientOutboundByteCounter\",\n outboundByteCounter, handlerToUseExecutionGroup,\n executionGroup, ch);\n\n // The following pipeline component is needed to decode the\n // server's SASL tokens. It is replaced with a\n // FixedLengthFrameDecoder (same as used with the\n // non-authenticated pipeline) after authentication\n // completes (as in non-auth pipeline below).\n PipelineUtils.addLastWithExecutorCheck(\n \"length-field-based-frame-decoder\",\n new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4),\n handlerToUseExecutionGroup, executionGroup, ch);\n PipelineUtils.addLastWithExecutorCheck(\"request-encoder\",\n new RequestEncoder(conf), handlerToUseExecutionGroup,\n executionGroup, ch);\n // The following pipeline component responds to the server's SASL\n // tokens with its own responses. Both client and server share the\n // same Hadoop Job token, which is used to create the SASL\n // tokens to authenticate with each other.\n // After authentication finishes, this pipeline component\n // is removed.\n PipelineUtils.addLastWithExecutorCheck(\"sasl-client-handler\",\n new SaslClientHandler(conf), handlerToUseExecutionGroup,\n executionGroup, ch);\n PipelineUtils.addLastWithExecutorCheck(\"response-handler\",\n new ResponseClientHandler(clientRequestIdRequestInfoMap,\n conf), handlerToUseExecutionGroup, executionGroup, ch);\n } else {\n LOG.info(\"Using Netty without authentication.\");\n/*end[HADOOP_NON_SECURE]*/\n PipelineUtils.addLastWithExecutorCheck(\"clientInboundByteCounter\",\n inboundByteCounter, handlerToUseExecutionGroup,\n executionGroup, ch);\n PipelineUtils.addLastWithExecutorCheck(\n \"clientOutboundByteCounter\",\n outboundByteCounter, handlerToUseExecutionGroup,\n executionGroup, ch);\n PipelineUtils.addLastWithExecutorCheck(\n \"fixed-length-frame-decoder\",\n new FixedLengthFrameDecoder(\n RequestServerHandler.RESPONSE_BYTES),\n handlerToUseExecutionGroup, executionGroup, ch);\n PipelineUtils.addLastWithExecutorCheck(\"request-encoder\",\n new RequestEncoder(conf), handlerToUseExecutionGroup,\n executionGroup, ch);\n PipelineUtils.addLastWithExecutorCheck(\"response-handler\",\n new ResponseClientHandler(clientRequestIdRequestInfoMap,\n conf), handlerToUseExecutionGroup, executionGroup, ch);\n\n/*if_not[HADOOP_NON_SECURE]*/\n }\n/*end[HADOOP_NON_SECURE]*/\n }\n });\n }\n\n /**\n * Pair object for connectAllAddresses().\n */\n private static class ChannelFutureAddress {\n /** Future object */\n private final ChannelFuture future;\n /** Address of the future */\n private final InetSocketAddress address;\n /** Task id */\n private final Integer taskId;\n\n /**\n * Constructor.\n *\n * @param future Immutable future\n * @param address Immutable address\n * @param taskId Immutable taskId\n */\n ChannelFutureAddress(\n ChannelFuture future, InetSocketAddress address, Integer taskId) {\n this.future = future;\n this.address = address;\n this.taskId = taskId;\n }\n\n @Override\n public String toString() {\n return \"(future=\" + future + \",address=\" + address + \",taskId=\" +\n taskId + \")\";\n }\n }\n\n /**\n * Connect to a collection of tasks servers\n *\n * @param tasks Tasks to connect to (if haven't already connected)\n */\n public void connectAllAddresses(Collection<? extends TaskInfo> tasks) {\n List<ChannelFutureAddress> waitingConnectionList =\n Lists.newArrayListWithCapacity(tasks.size() * channelsPerServer);\n for (TaskInfo taskInfo : tasks) {\n context.progress();\n InetSocketAddress address = taskIdAddressMap.get(taskInfo.getTaskId());\n if (address == null ||\n !address.getHostName().equals(taskInfo.getHostname()) ||\n address.getPort() != taskInfo.getPort()) {\n address = resolveAddress(maxResolveAddressAttempts,\n taskInfo.getInetSocketAddress());\n taskIdAddressMap.put(taskInfo.getTaskId(), address);\n }\n if (address == null || address.getHostName() == null ||\n address.getHostName().isEmpty()) {\n throw new IllegalStateException(\"connectAllAddresses: Null address \" +\n \"in addresses \" + tasks);\n }\n if (address.isUnresolved()) {\n throw new IllegalStateException(\"connectAllAddresses: Unresolved \" +\n \"address \" + address);\n }\n\n if (addressChannelMap.containsKey(address)) {\n continue;\n }\n\n // Start connecting to the remote server up to n time\n for (int i = 0; i < channelsPerServer; ++i) {\n ChannelFuture connectionFuture = bootstrap.connect(address);\n\n waitingConnectionList.add(\n new ChannelFutureAddress(\n connectionFuture, address, taskInfo.getTaskId()));\n }\n }\n\n // Wait for all the connections to succeed up to n tries\n int failures = 0;\n int connected = 0;\n while (failures < maxConnectionFailures) {\n List<ChannelFutureAddress> nextCheckFutures = Lists.newArrayList();\n for (ChannelFutureAddress waitingConnection : waitingConnectionList) {\n context.progress();\n ChannelFuture future = waitingConnection.future;\n ProgressableUtils.awaitChannelFuture(future, context);\n if (!future.isSuccess()) {\n LOG.warn(\"connectAllAddresses: Future failed \" +\n \"to connect with \" + waitingConnection.address + \" with \" +\n failures + \" failures because of \" + future.cause());\n\n ChannelFuture connectionFuture =\n bootstrap.connect(waitingConnection.address);\n nextCheckFutures.add(new ChannelFutureAddress(connectionFuture,\n waitingConnection.address, waitingConnection.taskId));\n ++failures;\n } else {\n Channel channel = future.channel();\n if (LOG.isDebugEnabled()) {\n LOG.debug(\"connectAllAddresses: Connected to \" +\n channel.remoteAddress() + \", open = \" + channel.isOpen());\n }\n\n if (channel.remoteAddress() == null) {\n throw new IllegalStateException(\n \"connectAllAddresses: Null remote address!\");\n }\n\n ChannelRotater rotater =\n addressChannelMap.get(waitingConnection.address);\n if (rotater == null) {\n ChannelRotater newRotater =\n new ChannelRotater(waitingConnection.taskId);\n rotater = addressChannelMap.putIfAbsent(\n waitingConnection.address, newRotater);\n if (rotater == null) {\n rotater = newRotater;\n }\n }\n rotater.addChannel(future.channel());\n ++connected;\n }\n }\n LOG.info(\"connectAllAddresses: Successfully added \" +\n (waitingConnectionList.size() - nextCheckFutures.size()) +\n \" connections, (\" + connected + \" total connected) \" +\n nextCheckFutures.size() + \" failed, \" +\n failures + \" failures total.\");\n if (nextCheckFutures.isEmpty()) {\n break;\n }\n waitingConnectionList = nextCheckFutures;\n }\n if (failures >= maxConnectionFailures) {\n throw new IllegalStateException(\n \"connectAllAddresses: Too many failures (\" + failures + \").\");\n }\n }\n\n/*if_not[HADOOP_NON_SECURE]*/\n /**\n * Authenticate all servers in addressChannelMap.\n */\n public void authenticate() {\n LOG.info(\"authenticate: NettyClient starting authentication with \" +\n \"servers.\");\n for (InetSocketAddress address: addressChannelMap.keySet()) {\n if (LOG.isDebugEnabled()) {\n LOG.debug(\"authenticate: Authenticating with address:\" + address);\n }\n ChannelRotater channelRotater = addressChannelMap.get(address);\n for (Channel channel: channelRotater.getChannels()) {\n if (LOG.isDebugEnabled()) {\n LOG.debug(\"authenticate: Authenticating with server on channel: \" +\n channel);\n }\n authenticateOnChannel(channelRotater.getTaskId(), channel);\n }\n }\n if (LOG.isInfoEnabled()) {\n LOG.info(\"authenticate: NettyClient successfully authenticated with \" +\n addressChannelMap.size() + \" server\" +\n ((addressChannelMap.size() != 1) ? \"s\" : \"\") +\n \" - continuing with normal work.\");\n }\n }\n\n /**\n * Authenticate with server connected at given channel.\n *\n * @param taskId Task id of the channel\n * @param channel Connection to server to authenticate with.\n */\n private void authenticateOnChannel(Integer taskId, Channel channel) {\n try {\n SaslNettyClient saslNettyClient = channel.attr(SASL).get();\n if (channel.attr(SASL).get() == null) {\n if (LOG.isDebugEnabled()) {\n LOG.debug(\"authenticateOnChannel: Creating saslNettyClient now \" +\n \"for channel: \" + channel);\n }\n saslNettyClient = new SaslNettyClient();\n channel.attr(SASL).set(saslNettyClient);\n }\n if (!saslNettyClient.isComplete()) {\n if (LOG.isDebugEnabled()) {\n LOG.debug(\"authenticateOnChannel: Waiting for authentication \" +\n \"to complete..\");\n }\n SaslTokenMessageRequest saslTokenMessage = saslNettyClient.firstToken();\n sendWritableRequest(taskId, saslTokenMessage);\n // We now wait for Netty's thread pool to communicate over this\n // channel to authenticate with another worker acting as a server.\n try {\n synchronized (saslNettyClient.getAuthenticated()) {\n while (!saslNettyClient.isComplete()) {\n saslNettyClient.getAuthenticated().wait();\n }\n }\n } catch (InterruptedException e) {\n LOG.error(\"authenticateOnChannel: Interrupted while waiting for \" +\n \"authentication.\");\n }\n }\n if (LOG.isDebugEnabled()) {\n LOG.debug(\"authenticateOnChannel: Authentication on channel: \" +\n channel + \" has completed successfully.\");\n }\n } catch (IOException e) {\n LOG.error(\"authenticateOnChannel: Failed to authenticate with server \" +\n \"due to error: \" + e);\n }\n return;\n }\n/*end[HADOOP_NON_SECURE]*/\n\n /**\n * Stop the client.\n */\n public void stop() {\n if (LOG.isInfoEnabled()) {\n LOG.info(\"stop: Halting netty client\");\n }\n // Close connections asynchronously, in a Netty-approved\n // way, without cleaning up thread pools until all channels\n // in addressChannelMap are closed (success or failure)\n int channelCount = 0;\n for (ChannelRotater channelRotater : addressChannelMap.values()) {\n channelCount += channelRotater.size();\n }\n final int done = channelCount;\n final AtomicInteger count = new AtomicInteger(0);\n for (ChannelRotater channelRotater : addressChannelMap.values()) {\n channelRotater.closeChannels(new ChannelFutureListener() {\n @Override\n public void operationComplete(ChannelFuture cf) {\n context.progress();\n if (count.incrementAndGet() == done) {\n if (LOG.isInfoEnabled()) {\n LOG.info(\"stop: reached wait threshold, \" +\n done + \" connections closed, releasing \" +\n \"resources now.\");\n }\n workerGroup.shutdownGracefully();\n if (executionGroup != null) {\n executionGroup.shutdownGracefully();\n }\n }\n }\n });\n }\n ProgressableUtils.awaitTerminationFuture(workerGroup, context);\n if (executionGroup != null) {\n ProgressableUtils.awaitTerminationFuture(executionGroup, context);\n }\n if (LOG.isInfoEnabled()) {\n LOG.info(\"stop: Netty client halted\");\n }\n }\n\n /**\n * Get the next available channel, reconnecting if necessary\n *\n * @param remoteServer Remote server to get a channel for\n * @return Available channel for this remote server\n */\n private Channel getNextChannel(InetSocketAddress remoteServer) {\n Channel channel = addressChannelMap.get(remoteServer).nextChannel();\n if (channel == null) {\n throw new IllegalStateException(\n \"getNextChannel: No channel exists for \" + remoteServer);\n }\n\n // Return this channel if it is connected\n if (channel.isActive()) {\n return channel;\n }\n\n // Get rid of the failed channel\n if (addressChannelMap.get(remoteServer).removeChannel(channel)) {\n LOG.warn(\"getNextChannel: Unlikely event that the channel \" +\n channel + \" was already removed!\");\n }\n if (LOG.isInfoEnabled()) {\n LOG.info(\"getNextChannel: Fixing disconnected channel to \" +\n remoteServer + \", open = \" + channel.isOpen() + \", \" +\n \"bound = \" + channel.isRegistered());\n }\n int reconnectFailures = 0;\n while (reconnectFailures < maxConnectionFailures) {\n ChannelFuture connectionFuture = bootstrap.connect(remoteServer);\n ProgressableUtils.awaitChannelFuture(connectionFuture, context);\n if (connectionFuture.isSuccess()) {\n if (LOG.isInfoEnabled()) {\n LOG.info(\"getNextChannel: Connected to \" + remoteServer + \"!\");\n }\n addressChannelMap.get(remoteServer).addChannel(\n connectionFuture.channel());\n return connectionFuture.channel();\n }\n ++reconnectFailures;\n LOG.warn(\"getNextChannel: Failed to reconnect to \" + remoteServer +\n \" on attempt \" + reconnectFailures + \" out of \" +\n maxConnectionFailures + \" max attempts, sleeping for 5 secs\",\n connectionFuture.cause());\n try {\n Thread.sleep(5000);\n } catch (InterruptedException e) {\n LOG.warn(\"getNextChannel: Unexpected interrupted exception\", e);\n }\n }\n throw new IllegalStateException(\"getNextChannel: Failed to connect \" +\n \"to \" + remoteServer + \" in \" + reconnectFailures +\n \" connect attempts\");\n }\n\n /**\n * Send a request to a remote server (should be already connected)\n *\n * @param destTaskId Destination task id\n * @param request Request to send\n */\n public void sendWritableRequest(Integer destTaskId,\n WritableRequest request) {\n InetSocketAddress remoteServer = taskIdAddressMap.get(destTaskId);\n if (clientRequestIdRequestInfoMap.isEmpty()) {\n inboundByteCounter.resetAll();\n outboundByteCounter.resetAll();\n }\n boolean registerRequest = true;\n/*if_not[HADOOP_NON_SECURE]*/\n if (request.getType() == RequestType.SASL_TOKEN_MESSAGE_REQUEST) {\n registerRequest = false;\n }\n/*end[HADOOP_NON_SECURE]*/\n\n Channel channel = getNextChannel(remoteServer);\n RequestInfo newRequestInfo = new RequestInfo(remoteServer, request);\n if (registerRequest) {\n request.setClientId(myTaskInfo.getTaskId());\n request.setRequestId(\n addressRequestIdGenerator.getNextRequestId(remoteServer));\n ClientRequestId clientRequestId =\n new ClientRequestId(destTaskId, request.getRequestId());\n RequestInfo oldRequestInfo = clientRequestIdRequestInfoMap.putIfAbsent(\n clientRequestId, newRequestInfo);\n if (oldRequestInfo != null) {\n throw new IllegalStateException(\"sendWritableRequest: Impossible to \" +\n \"have a previous request id = \" + request.getRequestId() + \", \" +\n \"request info of \" + oldRequestInfo);\n }\n }\n ChannelFuture writeFuture = channel.write(request);\n newRequestInfo.setWriteFuture(writeFuture);\n\n if (limitNumberOfOpenRequests &&\n clientRequestIdRequestInfoMap.size() > maxNumberOfOpenRequests) {\n waitSomeRequests(maxNumberOfOpenRequests);\n }\n }\n\n /**\n * Ensure all the request sent so far are complete.\n *\n * @throws InterruptedException\n */\n public void waitAllRequests() {\n waitSomeRequests(0);\n if (LOG.isInfoEnabled()) {\n LOG.info(\"waitAllRequests: Finished all requests. \" +\n inboundByteCounter.getMetrics() + \"\\n\" + outboundByteCounter\n .getMetrics());\n }\n }\n\n /**\n * Ensure that at most maxOpenRequests are not complete. Periodically,\n * check the state of every request. If we find the connection failed,\n * re-establish it and re-send the request.\n *\n * @param maxOpenRequests Maximum number of requests which can be not\n * complete\n */\n private void waitSomeRequests(int maxOpenRequests) {\n while (clientRequestIdRequestInfoMap.size() > maxOpenRequests) {\n // Wait for requests to complete for some time\n logInfoAboutOpenRequests(maxOpenRequests);\n synchronized (clientRequestIdRequestInfoMap) {\n if (clientRequestIdRequestInfoMap.size() <= maxOpenRequests) {\n break;\n }\n try {\n clientRequestIdRequestInfoMap.wait(waitingRequestMsecs);\n } catch (InterruptedException e) {\n LOG.error(\"waitSomeRequests: Got unexpected InterruptedException\", e);\n }\n }\n // Make sure that waiting doesn't kill the job\n context.progress();\n\n checkRequestsForProblems();\n }\n }\n\n /**\n * Log the status of open requests.\n *\n * @param maxOpenRequests Maximum number of requests which can be not complete\n */\n private void logInfoAboutOpenRequests(int maxOpenRequests) {\n if (LOG.isInfoEnabled() && requestLogger.isPrintable()) {\n LOG.info(\"logInfoAboutOpenRequests: Waiting interval of \" +\n waitingRequestMsecs + \" msecs, \" +\n clientRequestIdRequestInfoMap.size() +\n \" open requests, waiting for it to be <= \" + maxOpenRequests +\n \", \" + inboundByteCounter.getMetrics() + \"\\n\" +\n outboundByteCounter.getMetrics());\n\n if (clientRequestIdRequestInfoMap.size() < MAX_REQUESTS_TO_LIST) {\n for (Map.Entry<ClientRequestId, RequestInfo> entry :\n clientRequestIdRequestInfoMap.entrySet()) {\n LOG.info(\"logInfoAboutOpenRequests: Waiting for request \" +\n entry.getKey() + \" - \" + entry.getValue());\n }\n }\n\n // Count how many open requests each task has\n Map<Integer, Integer> openRequestCounts = Maps.newHashMap();\n for (ClientRequestId clientRequestId :\n clientRequestIdRequestInfoMap.keySet()) {\n int taskId = clientRequestId.getDestinationTaskId();\n Integer currentCount = openRequestCounts.get(taskId);\n openRequestCounts.put(taskId,\n (currentCount == null ? 0 : currentCount) + 1);\n }\n // Sort it in decreasing order of number of open requests\n List<Map.Entry<Integer, Integer>> sorted =\n Lists.newArrayList(openRequestCounts.entrySet());\n Collections.sort(sorted, new Comparator<Map.Entry<Integer, Integer>>() {\n @Override\n public int compare(Map.Entry<Integer, Integer> entry1,\n Map.Entry<Integer, Integer> entry2) {\n int value1 = entry1.getValue();\n int value2 = entry2.getValue();\n return (value1 < value2) ? 1 : ((value1 == value2) ? 0 : -1);\n }\n });\n // Print task ids which have the most open requests\n StringBuilder message = new StringBuilder();\n message.append(\"logInfoAboutOpenRequests: \");\n int itemsToPrint =\n Math.min(MAX_DESTINATION_TASK_IDS_TO_LIST, sorted.size());\n for (int i = 0; i < itemsToPrint; i++) {\n message.append(sorted.get(i).getValue())\n .append(\" requests for taskId=\")\n .append(sorted.get(i).getKey())\n .append(\", \");\n }\n LOG.info(message);\n }\n }\n\n /**\n * Check if there are some open requests which have been sent a long time\n * ago, and if so resend them.\n */\n private void checkRequestsForProblems() {\n long lastTimeChecked = lastTimeCheckedRequestsForProblems.get();\n // If not enough time passed from the previous check, return\n if (System.currentTimeMillis() < lastTimeChecked + waitingRequestMsecs) {\n return;\n }\n // If another thread did the check already, return\n if (!lastTimeCheckedRequestsForProblems.compareAndSet(lastTimeChecked,\n System.currentTimeMillis())) {\n return;\n }\n List<ClientRequestId> addedRequestIds = Lists.newArrayList();\n List<RequestInfo> addedRequestInfos = Lists.newArrayList();\n // Check all the requests for problems\n for (Map.Entry<ClientRequestId, RequestInfo> entry :\n clientRequestIdRequestInfoMap.entrySet()) {\n RequestInfo requestInfo = entry.getValue();\n ChannelFuture writeFuture = requestInfo.getWriteFuture();\n // Request wasn't sent yet\n if (writeFuture == null) {\n continue;\n }\n // If not connected anymore, request failed, or the request is taking\n // too long, re-establish and resend\n if (!writeFuture.channel().isActive() ||\n (writeFuture.isDone() && !writeFuture.isSuccess()) ||\n (requestInfo.getElapsedMsecs() > maxRequestMilliseconds)) {\n LOG.warn(\"checkRequestsForProblems: Problem with request id \" +\n entry.getKey() + \" connected = \" +\n writeFuture.channel().isActive() +\n \", future done = \" + writeFuture.isDone() + \", \" +\n \"success = \" + writeFuture.isSuccess() + \", \" +\n \"cause = \" + writeFuture.cause() + \", \" +\n \"elapsed time = \" + requestInfo.getElapsedMsecs() + \", \" +\n \"destination = \" + writeFuture.channel().remoteAddress() +\n \" \" + requestInfo);\n addedRequestIds.add(entry.getKey());\n addedRequestInfos.add(new RequestInfo(\n requestInfo.getDestinationAddress(), requestInfo.getRequest()));\n }\n }\n\n // Add any new requests to the system, connect if necessary, and re-send\n for (int i = 0; i < addedRequestIds.size(); ++i) {\n ClientRequestId requestId = addedRequestIds.get(i);\n RequestInfo requestInfo = addedRequestInfos.get(i);\n\n if (clientRequestIdRequestInfoMap.put(requestId, requestInfo) ==\n null) {\n LOG.warn(\"checkRequestsForProblems: Request \" + requestId +\n \" completed prior to sending the next request\");\n clientRequestIdRequestInfoMap.remove(requestId);\n }\n InetSocketAddress remoteServer = requestInfo.getDestinationAddress();\n Channel channel = getNextChannel(remoteServer);\n if (LOG.isInfoEnabled()) {\n LOG.info(\"checkRequestsForProblems: Re-issuing request \" + requestInfo);\n }\n ChannelFuture writeFuture = channel.write(requestInfo.getRequest());\n requestInfo.setWriteFuture(writeFuture);\n }\n addedRequestIds.clear();\n addedRequestInfos.clear();\n }\n\n /**\n * Utility method for resolving addresses\n *\n * @param maxResolveAddressAttempts Maximum number of attempts to resolve the\n * address\n * @param address The address we are attempting to resolve\n * @return The successfully resolved address.\n * @throws IllegalStateException if the address is not resolved\n * in <code>maxResolveAddressAttempts</code> tries.\n */\n private static InetSocketAddress resolveAddress(\n int maxResolveAddressAttempts, InetSocketAddress address) {\n int resolveAttempts = 0;\n while (address.isUnresolved() &&\n resolveAttempts < maxResolveAddressAttempts) {\n ++resolveAttempts;\n LOG.warn(\"resolveAddress: Failed to resolve \" + address +\n \" on attempt \" + resolveAttempts + \" of \" +\n maxResolveAddressAttempts + \" attempts, sleeping for 5 seconds\");\n try {\n Thread.sleep(5000);\n } catch (InterruptedException e) {\n LOG.warn(\"resolveAddress: Interrupted.\", e);\n }\n address = new InetSocketAddress(address.getHostName(),\n address.getPort());\n }\n if (resolveAttempts >= maxResolveAddressAttempts) {\n throw new IllegalStateException(\"resolveAddress: Couldn't \" +\n \"resolve \" + address + \" in \" + resolveAttempts + \" tries.\");\n }\n return address;\n }\n}\n
===================================================================
--- src/main/java/org/apache/giraph/comm/netty/NettyClient.java (revision 01d11687dd2e89f97e0b5c557bbfd0cefb6ed625)
+++ src/main/java/org/apache/giraph/comm/netty/NettyClient.java (revision )
@@ -153,6 +153,10 @@
private final int maxRequestMilliseconds;
/** Waiting internal for checking outstanding requests msecs */
private final int waitingRequestMsecs;
+
+ /** Fix - wait time when connection failed*/
+ private int sleepDelay = 100;
+
/** Timed logger for printing request debugging */
private final TimedLogger requestLogger = new TimedLogger(15 * 1000, LOG);
/** Worker executor group */
@@ -403,6 +407,7 @@
// Wait for all the connections to succeed up to n tries
int failures = 0;
int connected = 0;
+ sleepDelay = 100;
while (failures < maxConnectionFailures) {
List<ChannelFutureAddress> nextCheckFutures = Lists.newArrayList();
for (ChannelFutureAddress waitingConnection : waitingConnectionList) {
@@ -453,6 +458,19 @@
failures + " failures total.");
if (nextCheckFutures.isEmpty()) {
break;
+ } else {
+ try {
+ LOG.info("FIX: Waiting " + sleepDelay +
+ " ms for " + nextCheckFutures.size() + " connections");
+ Thread.sleep(sleepDelay);
+ context.getCounter(
+ "FIX waiting",
+ "Waiting for " + sleepDelay + "ms").increment(1);
+ int delay = (int) Math.round(sleepDelay * 1.2);
+ sleepDelay = Math.max(1000, delay);
+ } catch (InterruptedException e) {
+ LOG.error("Waiting failed:" + e);
+ }
}
waitingConnectionList = nextCheckFutures;
}