Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/429#discussion_r24681006
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
    @@ -42,344 +42,577 @@
     import org.slf4j.LoggerFactory;
     
     import backtype.storm.Config;
    +import backtype.storm.messaging.ConnectionWithStatus;
     import backtype.storm.metric.api.IStatefulObject;
    -import backtype.storm.messaging.IConnection;
     import backtype.storm.messaging.TaskMessage;
     import backtype.storm.utils.StormBoundedExponentialBackoffRetry;
     import backtype.storm.utils.Utils;
     
    -public class Client implements IConnection, IStatefulObject{
    +/**
    + * A Netty client for sending task messages to a remote destination (Netty 
server).
    + *
    + * Implementation details:
    + *
    + * - Sending messages, i.e. writing to the channel, is performed 
asynchronously.
    + * - Messages are sent in batches to optimize for network throughput at 
the expense of network latency.  The message
    + *   batch size is configurable.
    + * - Connecting and reconnecting are performed asynchronously.
    + *     - Note: The current implementation drops any messages that are 
being enqueued for sending if the connection to
    + *       the remote destination is currently unavailable.
    + * - A background flusher thread is run in the background.  It will, at 
fixed intervals, check for any pending messages
    + *   (i.e. messages buffered in memory) and flush them to the remote 
destination iff background flushing is currently
    + *   enabled.
    + */
    +public class Client extends ConnectionWithStatus implements 
IStatefulObject {
    +
         private static final Logger LOG = 
LoggerFactory.getLogger(Client.class);
         private static final String PREFIX = "Netty-Client-";
    -    private final int max_retries;
    -    private final int base_sleep_ms;
    -    private final int max_sleep_ms;
    +    private static final long NO_DELAY_MS = 0L;
    +    private static final long MINIMUM_INITIAL_DELAY_MS = 30000L;
    +    private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000L;
    +    private static final long PENDING_MESSAGES_FLUSH_INTERVAL_MS = 1000L;
    +    private static final long DISTANT_FUTURE_TIME_MS = Long.MAX_VALUE;
    +
         private final StormBoundedExponentialBackoffRetry retryPolicy;
    -    private AtomicReference<Channel> channelRef;
         private final ClientBootstrap bootstrap;
    -    private InetSocketAddress remote_addr;
    -    
    -    private AtomicInteger totalReconnects;
    -    private AtomicInteger messagesSent;
    -    private AtomicInteger messagesLostReconnect;
    -    private final Random random = new Random();
    -    private final ChannelFactory factory;
    -    private final int buffer_size;
    -    private boolean closing;
    -
    -    private int messageBatchSize;
    -    
    -    private AtomicLong pendings;
    -    
    -    Map storm_conf;
    +    private final InetSocketAddress dstAddress;
    +    protected final String dstAddressPrefixedName;
    +
    +    /**
    +     * The channel used for all write operations from this client to the 
remote destination.
    +     */
    +    private final AtomicReference<Channel> channelRef = new 
AtomicReference<Channel>(null);
    +
    +
    +    /**
    +     * Maximum number of reconnection attempts we will perform after a 
disconnect before giving up.
    +     */
    +    private final int maxReconnectionAttempts;
    --- End diff --
    
    I'm not sure we want a maximum number of reconnection attempts, or if we 
have it we need to think about how we intend to recover from it.  This could be 
a separate JIRA, but we ran into a situation where we hit the maximum number of 
reconnection attempts, and the exception was eaten because it was thrown from a 
background thread and it just killed the background thread.  This code appears 
to do the same thing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to