Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/429#discussion_r24680701
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
    @@ -42,344 +42,577 @@
     import org.slf4j.LoggerFactory;
     
     import backtype.storm.Config;
    +import backtype.storm.messaging.ConnectionWithStatus;
     import backtype.storm.metric.api.IStatefulObject;
    -import backtype.storm.messaging.IConnection;
     import backtype.storm.messaging.TaskMessage;
     import backtype.storm.utils.StormBoundedExponentialBackoffRetry;
     import backtype.storm.utils.Utils;
     
    -public class Client implements IConnection, IStatefulObject{
    +/**
    + * A Netty client for sending task messages to a remote destination (Netty 
server).
    + *
    + * Implementation details:
    + *
    + * - Sending messages, i.e. writing to the channel, is performed 
asynchronously.
    + * - Messages are sent in batches to optimize for network throughput at 
the expense of network latency.  The message
    + *   batch size is configurable.
    + * - Connecting and reconnecting are performed asynchronously.
    + *     - Note: The current implementation drops any messages that are 
being enqueued for sending if the connection to
    + *       the remote destination is currently unavailable.
    + * - A background flusher thread is run in the background.  It will, at 
fixed intervals, check for any pending messages
    + *   (i.e. messages buffered in memory) and flush them to the remote 
destination iff background flushing is currently
    + *   enabled.
    + */
    +public class Client extends ConnectionWithStatus implements 
IStatefulObject {
    +
         private static final Logger LOG = 
LoggerFactory.getLogger(Client.class);
         private static final String PREFIX = "Netty-Client-";
    -    private final int max_retries;
    -    private final int base_sleep_ms;
    -    private final int max_sleep_ms;
    +    private static final long NO_DELAY_MS = 0L;
    +    private static final long MINIMUM_INITIAL_DELAY_MS = 30000L;
    +    private static final long PENDING_MESSAGES_FLUSH_TIMEOUT_MS = 600000L;
    +    private static final long PENDING_MESSAGES_FLUSH_INTERVAL_MS = 1000L;
    +    private static final long DISTANT_FUTURE_TIME_MS = Long.MAX_VALUE;
    +
         private final StormBoundedExponentialBackoffRetry retryPolicy;
    -    private AtomicReference<Channel> channelRef;
         private final ClientBootstrap bootstrap;
    -    private InetSocketAddress remote_addr;
    -    
    -    private AtomicInteger totalReconnects;
    -    private AtomicInteger messagesSent;
    -    private AtomicInteger messagesLostReconnect;
    -    private final Random random = new Random();
    -    private final ChannelFactory factory;
    -    private final int buffer_size;
    -    private boolean closing;
    -
    -    private int messageBatchSize;
    -    
    -    private AtomicLong pendings;
    -    
    -    Map storm_conf;
    +    private final InetSocketAddress dstAddress;
    +    protected final String dstAddressPrefixedName;
    +
    +    /**
    +     * The channel used for all write operations from this client to the 
remote destination.
    +     */
    +    private final AtomicReference<Channel> channelRef = new 
AtomicReference<Channel>(null);
    +
    +
    +    /**
    +     * Maximum number of reconnection attempts we will perform after a 
disconnect before giving up.
    +     */
    +    private final int maxReconnectionAttempts;
    +
    +    /**
    +     * Total number of connection attempts.
    +     */
    +    private final AtomicInteger totalConnectionAttempts = new 
AtomicInteger(0);
    +
    +    /**
    +     * Number of connection attempts since the last disconnect.
    +     */
    +    private final AtomicInteger connectionAttempts = new AtomicInteger(0);
    +
    +    /**
    +     * Number of messages successfully sent to the remote destination.
    +     */
    +    private final AtomicInteger messagesSent = new AtomicInteger(0);
    +
    +    /**
    +     * Number of messages that could not be sent to the remote destination.
    +     */
    +    private final AtomicInteger messagesLost = new AtomicInteger(0);
    +
    +    /**
    +     * Number of messages buffered in memory.
    +     */
    +    private final AtomicLong pendingMessages = new AtomicLong(0);
    +
    +    /**
    +     * This flag is set to true if and only if a client instance is being 
closed.
    +     */
    +    private volatile boolean closing = false;
    +
    +    /**
    +     * When set to true, then the background flusher thread will flush any 
pending messages on its next run.
    +     */
    +    private final AtomicBoolean backgroundFlushingEnabled = new 
AtomicBoolean(false);
    +
    +    /**
    +     * The absolute time (in ms) when the next background flush should be 
performed.
    +     *
    +     * Note: The flush operation will only be performed if 
backgroundFlushingEnabled is true, too.
    +     */
    +    private final AtomicLong nextBackgroundFlushTimeMs = new 
AtomicLong(DISTANT_FUTURE_TIME_MS);
    +
    +    /**
    +     * The time interval (in ms) at which the background flusher thread 
will be run to check for any pending messages
    +     * to be flushed.
    +     */
    +    private final int flushCheckIntervalMs;
    +
    +    /**
    +     * How many messages should be batched together before sending them to 
the remote destination.
    +     *
    +     * Messages are batched to optimize network throughput at the expense 
of latency.
    +     */
    +    private final int messageBatchSize;
     
         private MessageBatch messageBatch = null;
    -    private AtomicLong flushCheckTimer;
    -    private int flushCheckInterval;
    -    private ScheduledExecutorService scheduler;
    +    private final ListeningScheduledExecutorService scheduler;
    +    protected final Map stormConf;
     
         @SuppressWarnings("rawtypes")
    -    Client(Map storm_conf, ChannelFactory factory, 
    -            ScheduledExecutorService scheduler, String host, int port) {
    -           this.storm_conf = storm_conf;
    -        this.factory = factory;
    -        this.scheduler = scheduler;
    -        channelRef = new AtomicReference<Channel>(null);
    +    Client(Map stormConf, ChannelFactory factory, ScheduledExecutorService 
scheduler, String host, int port) {
             closing = false;
    -        pendings = new AtomicLong(0);
    -        flushCheckTimer = new AtomicLong(Long.MAX_VALUE);
    -        totalReconnects = new AtomicInteger(0);
    -        messagesSent = new AtomicInteger(0);
    -        messagesLostReconnect = new AtomicInteger(0);
    -
    -        // Configure
    -        buffer_size = 
Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
    -        max_retries = 
Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES));
    -        base_sleep_ms = 
Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
    -        max_sleep_ms = 
Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
    -        retryPolicy = new 
StormBoundedExponentialBackoffRetry(base_sleep_ms, max_sleep_ms, max_retries);
    -
    -        this.messageBatchSize = 
Utils.getInt(storm_conf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144);
    -        
    -        flushCheckInterval = 
Utils.getInt(storm_conf.get(Config.STORM_NETTY_FLUSH_CHECK_INTERVAL_MS), 10); 
// default 10 ms
    -
    -        LOG.info("New Netty Client, connect to " + host + ", " + port
    -                + ", config: " + ", buffer_size: " + buffer_size);
    -
    -        bootstrap = new ClientBootstrap(factory);
    +        this.stormConf = stormConf;
    +        this.scheduler =  MoreExecutors.listeningDecorator(scheduler);
    +        int bufferSize = 
Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
    +        LOG.info("creating Netty Client, connecting to {}:{}, bufferSize: 
{}", host, port, bufferSize);
    +        messageBatchSize = 
Utils.getInt(stormConf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144);
    +        flushCheckIntervalMs = 
Utils.getInt(stormConf.get(Config.STORM_NETTY_FLUSH_CHECK_INTERVAL_MS), 10);
    +
    +        maxReconnectionAttempts = 
Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES));
    +        int minWaitMs = 
Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
    +        int maxWaitMs = 
Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
    +        retryPolicy = new StormBoundedExponentialBackoffRetry(minWaitMs, 
maxWaitMs, maxReconnectionAttempts);
    +
    +        // Initiate connection to remote destination
    +        bootstrap = createClientBootstrap(factory, bufferSize);
    +        dstAddress = new InetSocketAddress(host, port);
    +        dstAddressPrefixedName = prefixedName(dstAddress);
    +        connect(NO_DELAY_MS);
    +
    +        // Launch background flushing thread
    +        pauseBackgroundFlushing();
    +        long initialDelayMs = Math.min(MINIMUM_INITIAL_DELAY_MS, maxWaitMs 
* maxReconnectionAttempts);
    +        scheduler.scheduleWithFixedDelay(createBackgroundFlusher(), 
initialDelayMs, flushCheckIntervalMs,
    +            TimeUnit.MILLISECONDS);
    +    }
    +
    +    private ClientBootstrap createClientBootstrap(ChannelFactory factory, 
int bufferSize) {
    +        ClientBootstrap bootstrap = new ClientBootstrap(factory);
             bootstrap.setOption("tcpNoDelay", true);
    -        bootstrap.setOption("sendBufferSize", buffer_size);
    +        bootstrap.setOption("sendBufferSize", bufferSize);
             bootstrap.setOption("keepAlive", true);
    -
    -        // Set up the pipeline factory.
             bootstrap.setPipelineFactory(new StormClientPipelineFactory(this));
    +        return bootstrap;
    +    }
     
    -        // Start the connection attempt.
    -        remote_addr = new InetSocketAddress(host, port);
    -        
    -        // setup the connection asyncly now
    -        scheduler.execute(new Runnable() {
    -            @Override
    -            public void run() {   
    -                connect();
    -            }
    -        });
    -        
    -        Runnable flusher = new Runnable() {
    +    private String prefixedName(InetSocketAddress dstAddress) {
    +        if (null != dstAddress) {
    +            return PREFIX + dstAddress.toString();
    +        }
    +        return "";
    +    }
    +
    +    private Runnable createBackgroundFlusher() {
    +        return new Runnable() {
                 @Override
                 public void run() {
    -
    -                if(!closing) {
    -                    long flushCheckTime = flushCheckTimer.get();
    -                    long now = System.currentTimeMillis();
    -                    if (now > flushCheckTime) {
    -                        Channel channel = channelRef.get();
    -                        if (null != channel && channel.isWritable()) {
    -                            flush(channel);
    -                        }
    -                    }
    +                if(!closing && backgroundFlushingEnabled.get() && 
nowMillis() > nextBackgroundFlushTimeMs.get()) {
    +                    LOG.debug("flushing {} pending messages to {} in 
background", messageBatch.size(),
    +                        dstAddressPrefixedName);
    +                    flushPendingMessages();
                     }
    -                
                 }
             };
    -        
    -        long initialDelay = Math.min(30L * 1000, max_sleep_ms * 
max_retries); //max wait for 30s
    -        scheduler.scheduleWithFixedDelay(flusher, initialDelay, 
flushCheckInterval, TimeUnit.MILLISECONDS);
    +    }
    +
    +    private void pauseBackgroundFlushing() {
    +        backgroundFlushingEnabled.set(false);
    +    }
    +
    +    private void resumeBackgroundFlushing() {
    +        backgroundFlushingEnabled.set(true);
    +    }
    +
    +    private synchronized void flushPendingMessages() {
    +        Channel channel = channelRef.get();
    +        if (containsMessages(messageBatch)) {
    +            if (connectionEstablished(channel)) {
    +                if (channel.isWritable()) {
    +                    pauseBackgroundFlushing();
    +                    MessageBatch toBeFlushed = messageBatch;
    +                    flushMessages(channel, toBeFlushed);
    +                    messageBatch = null;
    +                }
    +                else if (closing) {
    +                    // Ensure background flushing is enabled so that we 
definitely have a chance to re-try the flush
    +                    // operation in case the client is being gracefully 
closed (where we have a brief time window where
    +                    // the client will wait for pending messages to be 
sent).
    +                    resumeBackgroundFlushing();
    +                }
    +            }
    +            else {
    +                closeChannelAndReconnect(channel);
    +            }
    +        }
    +    }
    +
    +    private long nowMillis() {
    +        return System.currentTimeMillis();
         }
     
         /**
          * We will retry connection with exponential back-off policy
          */
    -    private synchronized void connect() {
    +    private synchronized void connect(long delayMs) {
             try {
    +            if (closing) {
    +                return;
    +            }
     
    -            Channel channel = channelRef.get();
    -            if (channel != null && channel.isConnected()) {
    +            if (connectionEstablished(channelRef.get())) {
                     return;
                 }
     
    -            int tried = 0;
    -            //setting channel to null to make sure we throw an exception 
when reconnection fails
    -            channel = null;
    -            while (tried <= max_retries) {
    -
    -                LOG.info("Reconnect started for {}... [{}]", name(), 
tried);
    -                LOG.debug("connection started...");
    -
    -                totalReconnects.getAndIncrement();
    -                ChannelFuture future = bootstrap.connect(remote_addr);
    -                future.awaitUninterruptibly();
    -                Channel current = future.getChannel();
    -                if (!future.isSuccess()) {
    -                    if (null != current) {
    -                        current.close();
    +            connectionAttempts.getAndIncrement();
    +            if (reconnectingAllowed()) {
    +                totalConnectionAttempts.getAndIncrement();
    +                LOG.info("connection attempt {} to {} scheduled to run in 
{} ms", connectionAttempts.get(),
    +                    dstAddressPrefixedName, delayMs);
    +                ListenableFuture<Channel> channelFuture = 
scheduler.schedule(
    +                    new Connector(dstAddress, connectionAttempts.get()), 
delayMs, TimeUnit.MILLISECONDS);
    +                Futures.addCallback(channelFuture, new 
FutureCallback<Channel>() {
    +                    @Override public void onSuccess(Channel result) {
    +                        if (connectionEstablished(result)) {
    +                            setChannel(result);
    +                            LOG.info("connection established to {}", 
dstAddressPrefixedName);
    +                            connectionAttempts.set(0);
    +                        }
    +                        else {
    +                            reconnectAgain(new RuntimeException("Returned 
channel was actually not established"));
    +                        }
    +                    }
    +
    +                    @Override public void onFailure(Throwable t) {
    +                        reconnectAgain(t);
                         }
    -                } else {
    -                    channel = current;
    -                    break;
    -                }
    -                Thread.sleep(retryPolicy.getSleepTimeMs(tried, 0));
    -                tried++;  
    +
    +                    private void reconnectAgain(Throwable t) {
    +                        String baseMsg = String.format("connection attempt 
%s to %s failed", connectionAttempts,
    +                            dstAddressPrefixedName);
    +                        String failureMsg = (t == null)? baseMsg : baseMsg 
+ ": " + t.toString();
    +                        LOG.error(failureMsg);
    +                        long nextDelayMs = 
retryPolicy.getSleepTimeMs(connectionAttempts.get(), 0);
    +                        connect(nextDelayMs);
    +                    }
    +                });
                 }
    -            if (null != channel) {
    -                LOG.info("connection established to a remote host " + 
name() + ", " + channel.toString());
    -                channelRef.set(channel);
    -            } else {
    +            else {
                     close();
    -                throw new RuntimeException("Remote address is not 
reachable. We will close this client " + name());
    +                throw new RuntimeException("Giving up to connect to " + 
dstAddressPrefixedName + " after " +
    +                    connectionAttempts + " failed attempts");
                 }
    -        } catch (InterruptedException e) {
    -            throw new RuntimeException("connection failed " + name(), e);
             }
    +        catch (Exception e) {
    +            throw new RuntimeException("Failed to connect to " + 
dstAddressPrefixedName, e);
    +        }
    +    }
    +
    +    private void setChannel(Channel channel) {
    +        channelRef.set(channel);
    +    }
    +
    +    private boolean reconnectingAllowed() {
    +        return !closing && connectionAttempts.get() <= 
(maxReconnectionAttempts + 1);
    +    }
    +
    +    private boolean connectionEstablished(Channel channel) {
    +        // Because we are using TCP (which is a connection-oriented 
transport unlike UDP), a connection is only fully
    +        // established iff the channel is connected.  That is, a TCP-based 
channel must be in the CONNECTED state before
    +        // anything can be read or written to the channel.
    +        //
    +        // See:
    +        // - 
http://netty.io/3.9/api/org/jboss/netty/channel/ChannelEvent.html
    +        // - 
http://stackoverflow.com/questions/13356622/what-are-the-netty-channel-state-transitions
    +        return channel != null && channel.isConnected();
         }
     
         /**
    -     * Enqueue task messages to be sent to server
    +     * Note:  Storm will check via this method whether a worker can be 
activated safely during the initial startup of a
    +     * topology.  The worker will only be activated once all of the its 
connections are ready.
          */
    -    synchronized public void send(Iterator<TaskMessage> msgs) {
    +    @Override
    +    public Status status() {
    +        if (closing) {
    +            return Status.Closed;
    +        }
    +        else if (!connectionEstablished(channelRef.get())) {
    +            return Status.Connecting;
    +        }
    +        else {
    +            return Status.Ready;
    +        }
    +    }
    +
    +    /**
    +     * Receiving messages is not supported by a client.
    +     *
    +     * @throws java.lang.UnsupportedOperationException whenever this 
method is being called.
    +     */
    +    @Override
    +    public Iterator<TaskMessage> recv(int flags, int clientId) {
    +        throw new UnsupportedOperationException("Client connection should 
not receive any messages");
    +    }
     
    -        // throw exception if the client is being closed
    +    @Override
    +    public void send(int taskId, byte[] payload) {
    +        TaskMessage msg = new TaskMessage(taskId, payload);
    +        List<TaskMessage> wrapper = new ArrayList<TaskMessage>(1);
    +        wrapper.add(msg);
    +        send(wrapper.iterator());
    +    }
    +
    +    /**
    +     * Enqueue task messages to be sent to the remote destination (cf. 
`host` and `port`).
    +     */
    +    @Override
    +    public synchronized void send(Iterator<TaskMessage> msgs) {
             if (closing) {
    -            throw new RuntimeException("Client is being closed, and does 
not take requests any more");
    +            int numMessages = iteratorSize(msgs);
    +            LOG.warn("discarding {} messages because the Netty client to 
{} is being closed", numMessages,
    +                dstAddressPrefixedName);
    +            return;
             }
    -        
    -        if (null == msgs || !msgs.hasNext()) {
    +
    +        if (!hasMessages(msgs)) {
                 return;
             }
     
             Channel channel = channelRef.get();
    -        if (null == channel) {
    -            connect();
    -            channel = channelRef.get();
    +        if (!connectionEstablished(channel)) {
    +            // Closing the channel and reconnecting should be done before 
handling the messages.
    +            closeChannelAndReconnect(channel);
    +            handleMessagesWhenConnectionIsUnavailable(msgs);
    +            return;
             }
     
    +        // Collect messages into batches (to optimize network throughput), 
then flush them.
             while (msgs.hasNext()) {
    -            if (!channel.isConnected()) {
    -                connect();
    -                channel = channelRef.get();
    -            }
                 TaskMessage message = msgs.next();
    -            if (null == messageBatch) {
    +            if (messageBatch == null) {
                     messageBatch = new MessageBatch(messageBatchSize);
                 }
     
                 messageBatch.add(message);
                 if (messageBatch.isFull()) {
                     MessageBatch toBeFlushed = messageBatch;
    -                flushRequest(channel, toBeFlushed);
    +                flushMessages(channel, toBeFlushed);
                     messageBatch = null;
                 }
             }
     
    -        if (null != messageBatch && !messageBatch.isEmpty()) {
    -            if (channel.isWritable()) {
    -                flushCheckTimer.set(Long.MAX_VALUE);
    -                
    -                // Flush as fast as we can to reduce the latency
    +        // Handle any remaining messages in case the "last" batch was not 
full.
    +        if (containsMessages(messageBatch)) {
    +            if (connectionEstablished(channel) && channel.isWritable()) {
    +                // We can write to the channel, so we flush the remaining 
messages immediately to minimize latency.
    +                pauseBackgroundFlushing();
                     MessageBatch toBeFlushed = messageBatch;
                     messageBatch = null;
    -                flushRequest(channel, toBeFlushed);
    -                
    -            } else {
    -                // when channel is NOT writable, it means the internal 
netty buffer is full. 
    -                // In this case, we can try to buffer up more incoming 
messages.
    -                flushCheckTimer.set(System.currentTimeMillis() + 
flushCheckInterval);
    +                flushMessages(channel, toBeFlushed);
    +            }
    +            else {
    +                // We cannot write to the channel, which means Netty's 
internal write buffer is full.
    +                // In this case, we buffer the remaining messages and wait 
for the next messages to arrive.
    +                //
    +                // Background:
    +                // Netty 3.x maintains an internal write buffer with a 
high water mark for each channel (default: 64K).
    +                // This represents the amount of data waiting to be 
flushed to operating system buffers.  If the
    +                // outstanding data exceeds this value then the channel is 
set to non-writable.  When this happens, a
    +                // INTEREST_CHANGED channel event is triggered.  Netty 
sets the channel to writable again once the data
    +                // has been flushed to the system buffers.
    +                //
    +                // See http://stackoverflow.com/questions/14049260
    +                resumeBackgroundFlushing();
    +                nextBackgroundFlushTimeMs.set(nowMillis() + 
flushCheckIntervalMs);
                 }
             }
     
         }
     
    -    public String name() {
    -        if (null != remote_addr) {
    -            return PREFIX + remote_addr.toString();
    -        }
    -        return "";
    +    private boolean hasMessages(Iterator<TaskMessage> msgs) {
    +        return msgs != null && msgs.hasNext();
         }
     
    -    private synchronized void flush(Channel channel) {
    -        if (!closing) {
    -            if (null != messageBatch && !messageBatch.isEmpty()) {
    -                MessageBatch toBeFlushed = messageBatch;
    -                flushCheckTimer.set(Long.MAX_VALUE);
    -                flushRequest(channel, toBeFlushed);
    -                messageBatch = null;
    +    /**
    +     * We will drop pending messages and let at-least-once message replay 
kick in.
    +     *
    +     * Another option would be to buffer the messages in memory.  But this 
option has the risk of causing OOM errors,
    +     * especially for topologies that disable message acking because we 
don't know whether the connection recovery will
    +     * succeed  or not, and how long the recovery will take.
    +     */
    +    private void 
handleMessagesWhenConnectionIsUnavailable(Iterator<TaskMessage> msgs) {
    +        LOG.error("connection to {} is unavailable", 
dstAddressPrefixedName);
    +        dropPendingMessages(msgs);
    +    }
    +
    +    private void dropPendingMessages(Iterator<TaskMessage> msgs) {
    +        // We consume the iterator by traversing and thus "emptying" it.
    +        int msgCount = iteratorSize(msgs);
    --- End diff --
    
    We are throwing away messages, it would be nice to include that in the 
state so we can know how many messages we threw away waiting for a worker to 
come up.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to