Looks perfect except one thing. Don't allow clients to put WriteRequest's
into the queue while you're draining it. If you allow other threads to
enqueue more WriteRequest's, the algorithm is unbounded and you risk
getting stuck writing for only one channel. I added a synchronized block
below.

On Mon, Dec 5, 2011 at 4:07 AM, Julien Vermillard <jvermill...@gmail.com>wrote:

> Hi,
> snipped a lot of the message :)
> >> So, do you mean that the underlying layer will not allow us to push say,
> >> 20M, without informing the session that it's full ? In other word, there
> >> is a limited size that can be pushed and we don't have to take care of
> >> this limit ourselves ?
> >
> >
> >>
> > Sort of. If the TCP send window (OS layer) has less room in it than the
> > outputBuffer.remaining(), the write will only write a portion of
> > outputBufffer. Consider this the CONGESTION_CONTROLLED state. If the TCP
> > send window is full when you try to write, the write will return 0. The
> > algorithm should never see this case because you should always stop
> trying
> > to write when only a portion of the outputBuffer is written. And, always
> > continue to try and write when an entire outputBuffer is written and
> there
> > are more outputBuffers to write in the output queue.
> >
>
> Here the write algorithm used in trunk (3.0), we give up writing if
> the buffer is not written totally because we consider the kernel
> buffer is full or congested :
>
> Queue<WriteRequest> queue = session.getWriteQueue();
>
> do {
>    // get a write request from the queue
>
      synchronized (queue) {

>    WriteRequest wreq = queue.peek();
>    if (wreq == null) {
>        break;
>    }
>    ByteBuffer buf = (ByteBuffer) wreq.getMessage();
>
>
>    int wrote = session.getSocketChannel().write(buf);
>    if (LOGGER.isDebugEnabled()) {
>        LOGGER.debug("wrote {} bytes to {}", wrote, session);
>    }
>
>    if (buf.remaining() == 0) {
>        // completed write request, let's remove
>        // it
>        queue.remove();
>        // complete the future
>        DefaultWriteFuture future = (DefaultWriteFuture) wreq.getFuture();
>        if (future != null) {
>            future.complete();
>        }
>    } else {
>        // output socket buffer is full, we need
>        // to give up until next selection for
>        // writing
>        break;
>    }
>
>     } // end synchronized(queue)


> } while (!queue.isEmpty());
>

Reply via email to