Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/429#discussion_r24681006 --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java --- @@ -42,344 +42,577 @@ import org.slf4j.LoggerFactory; import backtype.storm.Config; +import backtype.storm.messaging.ConnectionWithStatus; import backtype.storm.metric.api.IStatefulObject; -import backtype.storm.messaging.IConnection; import backtype.storm.messaging.TaskMessage; import backtype.storm.utils.StormBoundedExponentialBackoffRetry; import backtype.storm.utils.Utils; -public class Client implements IConnection, IStatefulObject{ +/** + * A Netty client for sending task messages to a remote destination (Netty server). + * + * Implementation details: + * + * - Sending messages, i.e. writing to the channel, is performed asynchronously. + * - Messages are sent in batches to optimize for network throughput at the expense of network latency. The message + * batch size is configurable. + * - Connecting and reconnecting are performed asynchronously. + * - Note: The current implementation drops any messages that are being enqueued for sending if the connection to + * the remote destination is currently unavailable. + * - A background flusher thread is run in the background. It will, at fixed intervals, check for any pending messages + * (i.e. messages buffered in memory) and flush them to the remote destination iff background flushing is currently + * enabled. + */ +public class Client extends ConnectionWithStatus implements IStatefulObject { + private static final Logger LOG = LoggerFactory.getLogger(Client.class); private static final String PREFIX = "Netty-Client-"; - private final int max_retries; - private final int base_sleep_ms; - private final int max_sleep_ms; + private static final long NO_DELAY_MS = 0L; + private static final long MINIMUM_INITIAL_DELAY_MS = 30000L; + private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000L; + private static final long PENDING_MESSAGES_FLUSH_INTERVAL_MS = 1000L; + private static final long DISTANT_FUTURE_TIME_MS = Long.MAX_VALUE; + private final StormBoundedExponentialBackoffRetry retryPolicy; - private AtomicReference<Channel> channelRef; private final ClientBootstrap bootstrap; - private InetSocketAddress remote_addr; - - private AtomicInteger totalReconnects; - private AtomicInteger messagesSent; - private AtomicInteger messagesLostReconnect; - private final Random random = new Random(); - private final ChannelFactory factory; - private final int buffer_size; - private boolean closing; - - private int messageBatchSize; - - private AtomicLong pendings; - - Map storm_conf; + private final InetSocketAddress dstAddress; + protected final String dstAddressPrefixedName; + + /** + * The channel used for all write operations from this client to the remote destination. + */ + private final AtomicReference<Channel> channelRef = new AtomicReference<Channel>(null); + + + /** + * Maximum number of reconnection attempts we will perform after a disconnect before giving up. + */ + private final int maxReconnectionAttempts; --- End diff -- I'm not sure we want a maximum number of reconnection attempts, or if we have it we need to think about how we intend to recover from it. This could be a separate JIRA, but we ran into a situation where we hit the maximum number of reconnection attempts, and the exception was eaten because it was thrown from a background thread and it just killed the background thread. This code appears to do the same thing.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---