Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/429#discussion_r24680701 --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java --- @@ -42,344 +42,577 @@ import org.slf4j.LoggerFactory; import backtype.storm.Config; +import backtype.storm.messaging.ConnectionWithStatus; import backtype.storm.metric.api.IStatefulObject; -import backtype.storm.messaging.IConnection; import backtype.storm.messaging.TaskMessage; import backtype.storm.utils.StormBoundedExponentialBackoffRetry; import backtype.storm.utils.Utils; -public class Client implements IConnection, IStatefulObject{ +/** + * A Netty client for sending task messages to a remote destination (Netty server). + * + * Implementation details: + * + * - Sending messages, i.e. writing to the channel, is performed asynchronously. + * - Messages are sent in batches to optimize for network throughput at the expense of network latency. The message + * batch size is configurable. + * - Connecting and reconnecting are performed asynchronously. + * - Note: The current implementation drops any messages that are being enqueued for sending if the connection to + * the remote destination is currently unavailable. + * - A background flusher thread is run in the background. It will, at fixed intervals, check for any pending messages + * (i.e. messages buffered in memory) and flush them to the remote destination iff background flushing is currently + * enabled. + */ +public class Client extends ConnectionWithStatus implements IStatefulObject { + private static final Logger LOG = LoggerFactory.getLogger(Client.class); private static final String PREFIX = "Netty-Client-"; - private final int max_retries; - private final int base_sleep_ms; - private final int max_sleep_ms; + private static final long NO_DELAY_MS = 0L; + private static final long MINIMUM_INITIAL_DELAY_MS = 30000L; + private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000L; + private static final long PENDING_MESSAGES_FLUSH_INTERVAL_MS = 1000L; + private static final long DISTANT_FUTURE_TIME_MS = Long.MAX_VALUE; + private final StormBoundedExponentialBackoffRetry retryPolicy; - private AtomicReference<Channel> channelRef; private final ClientBootstrap bootstrap; - private InetSocketAddress remote_addr; - - private AtomicInteger totalReconnects; - private AtomicInteger messagesSent; - private AtomicInteger messagesLostReconnect; - private final Random random = new Random(); - private final ChannelFactory factory; - private final int buffer_size; - private boolean closing; - - private int messageBatchSize; - - private AtomicLong pendings; - - Map storm_conf; + private final InetSocketAddress dstAddress; + protected final String dstAddressPrefixedName; + + /** + * The channel used for all write operations from this client to the remote destination. + */ + private final AtomicReference<Channel> channelRef = new AtomicReference<Channel>(null); + + + /** + * Maximum number of reconnection attempts we will perform after a disconnect before giving up. + */ + private final int maxReconnectionAttempts; + + /** + * Total number of connection attempts. + */ + private final AtomicInteger totalConnectionAttempts = new AtomicInteger(0); + + /** + * Number of connection attempts since the last disconnect. + */ + private final AtomicInteger connectionAttempts = new AtomicInteger(0); + + /** + * Number of messages successfully sent to the remote destination. + */ + private final AtomicInteger messagesSent = new AtomicInteger(0); + + /** + * Number of messages that could not be sent to the remote destination. + */ + private final AtomicInteger messagesLost = new AtomicInteger(0); + + /** + * Number of messages buffered in memory. + */ + private final AtomicLong pendingMessages = new AtomicLong(0); + + /** + * This flag is set to true if and only if a client instance is being closed. + */ + private volatile boolean closing = false; + + /** + * When set to true, then the background flusher thread will flush any pending messages on its next run. + */ + private final AtomicBoolean backgroundFlushingEnabled = new AtomicBoolean(false); + + /** + * The absolute time (in ms) when the next background flush should be performed. + * + * Note: The flush operation will only be performed if backgroundFlushingEnabled is true, too. + */ + private final AtomicLong nextBackgroundFlushTimeMs = new AtomicLong(DISTANT_FUTURE_TIME_MS); + + /** + * The time interval (in ms) at which the background flusher thread will be run to check for any pending messages + * to be flushed. + */ + private final int flushCheckIntervalMs; + + /** + * How many messages should be batched together before sending them to the remote destination. + * + * Messages are batched to optimize network throughput at the expense of latency. + */ + private final int messageBatchSize; private MessageBatch messageBatch = null; - private AtomicLong flushCheckTimer; - private int flushCheckInterval; - private ScheduledExecutorService scheduler; + private final ListeningScheduledExecutorService scheduler; + protected final Map stormConf; @SuppressWarnings("rawtypes") - Client(Map storm_conf, ChannelFactory factory, - ScheduledExecutorService scheduler, String host, int port) { - this.storm_conf = storm_conf; - this.factory = factory; - this.scheduler = scheduler; - channelRef = new AtomicReference<Channel>(null); + Client(Map stormConf, ChannelFactory factory, ScheduledExecutorService scheduler, String host, int port) { closing = false; - pendings = new AtomicLong(0); - flushCheckTimer = new AtomicLong(Long.MAX_VALUE); - totalReconnects = new AtomicInteger(0); - messagesSent = new AtomicInteger(0); - messagesLostReconnect = new AtomicInteger(0); - - // Configure - buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE)); - max_retries = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES)); - base_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS)); - max_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS)); - retryPolicy = new StormBoundedExponentialBackoffRetry(base_sleep_ms, max_sleep_ms, max_retries); - - this.messageBatchSize = Utils.getInt(storm_conf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144); - - flushCheckInterval = Utils.getInt(storm_conf.get(Config.STORM_NETTY_FLUSH_CHECK_INTERVAL_MS), 10); // default 10 ms - - LOG.info("New Netty Client, connect to " + host + ", " + port - + ", config: " + ", buffer_size: " + buffer_size); - - bootstrap = new ClientBootstrap(factory); + this.stormConf = stormConf; + this.scheduler = MoreExecutors.listeningDecorator(scheduler); + int bufferSize = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE)); + LOG.info("creating Netty Client, connecting to {}:{}, bufferSize: {}", host, port, bufferSize); + messageBatchSize = Utils.getInt(stormConf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144); + flushCheckIntervalMs = Utils.getInt(stormConf.get(Config.STORM_NETTY_FLUSH_CHECK_INTERVAL_MS), 10); + + maxReconnectionAttempts = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES)); + int minWaitMs = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS)); + int maxWaitMs = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS)); + retryPolicy = new StormBoundedExponentialBackoffRetry(minWaitMs, maxWaitMs, maxReconnectionAttempts); + + // Initiate connection to remote destination + bootstrap = createClientBootstrap(factory, bufferSize); + dstAddress = new InetSocketAddress(host, port); + dstAddressPrefixedName = prefixedName(dstAddress); + connect(NO_DELAY_MS); + + // Launch background flushing thread + pauseBackgroundFlushing(); + long initialDelayMs = Math.min(MINIMUM_INITIAL_DELAY_MS, maxWaitMs * maxReconnectionAttempts); + scheduler.scheduleWithFixedDelay(createBackgroundFlusher(), initialDelayMs, flushCheckIntervalMs, + TimeUnit.MILLISECONDS); + } + + private ClientBootstrap createClientBootstrap(ChannelFactory factory, int bufferSize) { + ClientBootstrap bootstrap = new ClientBootstrap(factory); bootstrap.setOption("tcpNoDelay", true); - bootstrap.setOption("sendBufferSize", buffer_size); + bootstrap.setOption("sendBufferSize", bufferSize); bootstrap.setOption("keepAlive", true); - - // Set up the pipeline factory. bootstrap.setPipelineFactory(new StormClientPipelineFactory(this)); + return bootstrap; + } - // Start the connection attempt. - remote_addr = new InetSocketAddress(host, port); - - // setup the connection asyncly now - scheduler.execute(new Runnable() { - @Override - public void run() { - connect(); - } - }); - - Runnable flusher = new Runnable() { + private String prefixedName(InetSocketAddress dstAddress) { + if (null != dstAddress) { + return PREFIX + dstAddress.toString(); + } + return ""; + } + + private Runnable createBackgroundFlusher() { + return new Runnable() { @Override public void run() { - - if(!closing) { - long flushCheckTime = flushCheckTimer.get(); - long now = System.currentTimeMillis(); - if (now > flushCheckTime) { - Channel channel = channelRef.get(); - if (null != channel && channel.isWritable()) { - flush(channel); - } - } + if(!closing && backgroundFlushingEnabled.get() && nowMillis() > nextBackgroundFlushTimeMs.get()) { + LOG.debug("flushing {} pending messages to {} in background", messageBatch.size(), + dstAddressPrefixedName); + flushPendingMessages(); } - } }; - - long initialDelay = Math.min(30L * 1000, max_sleep_ms * max_retries); //max wait for 30s - scheduler.scheduleWithFixedDelay(flusher, initialDelay, flushCheckInterval, TimeUnit.MILLISECONDS); + } + + private void pauseBackgroundFlushing() { + backgroundFlushingEnabled.set(false); + } + + private void resumeBackgroundFlushing() { + backgroundFlushingEnabled.set(true); + } + + private synchronized void flushPendingMessages() { + Channel channel = channelRef.get(); + if (containsMessages(messageBatch)) { + if (connectionEstablished(channel)) { + if (channel.isWritable()) { + pauseBackgroundFlushing(); + MessageBatch toBeFlushed = messageBatch; + flushMessages(channel, toBeFlushed); + messageBatch = null; + } + else if (closing) { + // Ensure background flushing is enabled so that we definitely have a chance to re-try the flush + // operation in case the client is being gracefully closed (where we have a brief time window where + // the client will wait for pending messages to be sent). + resumeBackgroundFlushing(); + } + } + else { + closeChannelAndReconnect(channel); + } + } + } + + private long nowMillis() { + return System.currentTimeMillis(); } /** * We will retry connection with exponential back-off policy */ - private synchronized void connect() { + private synchronized void connect(long delayMs) { try { + if (closing) { + return; + } - Channel channel = channelRef.get(); - if (channel != null && channel.isConnected()) { + if (connectionEstablished(channelRef.get())) { return; } - int tried = 0; - //setting channel to null to make sure we throw an exception when reconnection fails - channel = null; - while (tried <= max_retries) { - - LOG.info("Reconnect started for {}... [{}]", name(), tried); - LOG.debug("connection started..."); - - totalReconnects.getAndIncrement(); - ChannelFuture future = bootstrap.connect(remote_addr); - future.awaitUninterruptibly(); - Channel current = future.getChannel(); - if (!future.isSuccess()) { - if (null != current) { - current.close(); + connectionAttempts.getAndIncrement(); + if (reconnectingAllowed()) { + totalConnectionAttempts.getAndIncrement(); + LOG.info("connection attempt {} to {} scheduled to run in {} ms", connectionAttempts.get(), + dstAddressPrefixedName, delayMs); + ListenableFuture<Channel> channelFuture = scheduler.schedule( + new Connector(dstAddress, connectionAttempts.get()), delayMs, TimeUnit.MILLISECONDS); + Futures.addCallback(channelFuture, new FutureCallback<Channel>() { + @Override public void onSuccess(Channel result) { + if (connectionEstablished(result)) { + setChannel(result); + LOG.info("connection established to {}", dstAddressPrefixedName); + connectionAttempts.set(0); + } + else { + reconnectAgain(new RuntimeException("Returned channel was actually not established")); + } + } + + @Override public void onFailure(Throwable t) { + reconnectAgain(t); } - } else { - channel = current; - break; - } - Thread.sleep(retryPolicy.getSleepTimeMs(tried, 0)); - tried++; + + private void reconnectAgain(Throwable t) { + String baseMsg = String.format("connection attempt %s to %s failed", connectionAttempts, + dstAddressPrefixedName); + String failureMsg = (t == null)? baseMsg : baseMsg + ": " + t.toString(); + LOG.error(failureMsg); + long nextDelayMs = retryPolicy.getSleepTimeMs(connectionAttempts.get(), 0); + connect(nextDelayMs); + } + }); } - if (null != channel) { - LOG.info("connection established to a remote host " + name() + ", " + channel.toString()); - channelRef.set(channel); - } else { + else { close(); - throw new RuntimeException("Remote address is not reachable. We will close this client " + name()); + throw new RuntimeException("Giving up to connect to " + dstAddressPrefixedName + " after " + + connectionAttempts + " failed attempts"); } - } catch (InterruptedException e) { - throw new RuntimeException("connection failed " + name(), e); } + catch (Exception e) { + throw new RuntimeException("Failed to connect to " + dstAddressPrefixedName, e); + } + } + + private void setChannel(Channel channel) { + channelRef.set(channel); + } + + private boolean reconnectingAllowed() { + return !closing && connectionAttempts.get() <= (maxReconnectionAttempts + 1); + } + + private boolean connectionEstablished(Channel channel) { + // Because we are using TCP (which is a connection-oriented transport unlike UDP), a connection is only fully + // established iff the channel is connected. That is, a TCP-based channel must be in the CONNECTED state before + // anything can be read or written to the channel. + // + // See: + // - http://netty.io/3.9/api/org/jboss/netty/channel/ChannelEvent.html + // - http://stackoverflow.com/questions/13356622/what-are-the-netty-channel-state-transitions + return channel != null && channel.isConnected(); } /** - * Enqueue task messages to be sent to server + * Note: Storm will check via this method whether a worker can be activated safely during the initial startup of a + * topology. The worker will only be activated once all of the its connections are ready. */ - synchronized public void send(Iterator<TaskMessage> msgs) { + @Override + public Status status() { + if (closing) { + return Status.Closed; + } + else if (!connectionEstablished(channelRef.get())) { + return Status.Connecting; + } + else { + return Status.Ready; + } + } + + /** + * Receiving messages is not supported by a client. + * + * @throws java.lang.UnsupportedOperationException whenever this method is being called. + */ + @Override + public Iterator<TaskMessage> recv(int flags, int clientId) { + throw new UnsupportedOperationException("Client connection should not receive any messages"); + } - // throw exception if the client is being closed + @Override + public void send(int taskId, byte[] payload) { + TaskMessage msg = new TaskMessage(taskId, payload); + List<TaskMessage> wrapper = new ArrayList<TaskMessage>(1); + wrapper.add(msg); + send(wrapper.iterator()); + } + + /** + * Enqueue task messages to be sent to the remote destination (cf. `host` and `port`). + */ + @Override + public synchronized void send(Iterator<TaskMessage> msgs) { if (closing) { - throw new RuntimeException("Client is being closed, and does not take requests any more"); + int numMessages = iteratorSize(msgs); + LOG.warn("discarding {} messages because the Netty client to {} is being closed", numMessages, + dstAddressPrefixedName); + return; } - - if (null == msgs || !msgs.hasNext()) { + + if (!hasMessages(msgs)) { return; } Channel channel = channelRef.get(); - if (null == channel) { - connect(); - channel = channelRef.get(); + if (!connectionEstablished(channel)) { + // Closing the channel and reconnecting should be done before handling the messages. + closeChannelAndReconnect(channel); + handleMessagesWhenConnectionIsUnavailable(msgs); + return; } + // Collect messages into batches (to optimize network throughput), then flush them. while (msgs.hasNext()) { - if (!channel.isConnected()) { - connect(); - channel = channelRef.get(); - } TaskMessage message = msgs.next(); - if (null == messageBatch) { + if (messageBatch == null) { messageBatch = new MessageBatch(messageBatchSize); } messageBatch.add(message); if (messageBatch.isFull()) { MessageBatch toBeFlushed = messageBatch; - flushRequest(channel, toBeFlushed); + flushMessages(channel, toBeFlushed); messageBatch = null; } } - if (null != messageBatch && !messageBatch.isEmpty()) { - if (channel.isWritable()) { - flushCheckTimer.set(Long.MAX_VALUE); - - // Flush as fast as we can to reduce the latency + // Handle any remaining messages in case the "last" batch was not full. + if (containsMessages(messageBatch)) { + if (connectionEstablished(channel) && channel.isWritable()) { + // We can write to the channel, so we flush the remaining messages immediately to minimize latency. + pauseBackgroundFlushing(); MessageBatch toBeFlushed = messageBatch; messageBatch = null; - flushRequest(channel, toBeFlushed); - - } else { - // when channel is NOT writable, it means the internal netty buffer is full. - // In this case, we can try to buffer up more incoming messages. - flushCheckTimer.set(System.currentTimeMillis() + flushCheckInterval); + flushMessages(channel, toBeFlushed); + } + else { + // We cannot write to the channel, which means Netty's internal write buffer is full. + // In this case, we buffer the remaining messages and wait for the next messages to arrive. + // + // Background: + // Netty 3.x maintains an internal write buffer with a high water mark for each channel (default: 64K). + // This represents the amount of data waiting to be flushed to operating system buffers. If the + // outstanding data exceeds this value then the channel is set to non-writable. When this happens, a + // INTEREST_CHANGED channel event is triggered. Netty sets the channel to writable again once the data + // has been flushed to the system buffers. + // + // See http://stackoverflow.com/questions/14049260 + resumeBackgroundFlushing(); + nextBackgroundFlushTimeMs.set(nowMillis() + flushCheckIntervalMs); } } } - public String name() { - if (null != remote_addr) { - return PREFIX + remote_addr.toString(); - } - return ""; + private boolean hasMessages(Iterator<TaskMessage> msgs) { + return msgs != null && msgs.hasNext(); } - private synchronized void flush(Channel channel) { - if (!closing) { - if (null != messageBatch && !messageBatch.isEmpty()) { - MessageBatch toBeFlushed = messageBatch; - flushCheckTimer.set(Long.MAX_VALUE); - flushRequest(channel, toBeFlushed); - messageBatch = null; + /** + * We will drop pending messages and let at-least-once message replay kick in. + * + * Another option would be to buffer the messages in memory. But this option has the risk of causing OOM errors, + * especially for topologies that disable message acking because we don't know whether the connection recovery will + * succeed or not, and how long the recovery will take. + */ + private void handleMessagesWhenConnectionIsUnavailable(Iterator<TaskMessage> msgs) { + LOG.error("connection to {} is unavailable", dstAddressPrefixedName); + dropPendingMessages(msgs); + } + + private void dropPendingMessages(Iterator<TaskMessage> msgs) { + // We consume the iterator by traversing and thus "emptying" it. + int msgCount = iteratorSize(msgs); --- End diff -- We are throwing away messages, it would be nice to include that in the state so we can know how many messages we threw away waiting for a worker to come up.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---