onMessageSent is invoked when ONE message is sent, not all message is sent.

Anyway I found a approach to make it work, I remembered the number of messages 
that's being sent - increase the counter by one before call filterWrite and 
decrease it by one when messageReceived is invoked. Then to test whether the 
IoSession is ready to write data is just to see if sendingMessages == 0.

============       code begin      ================================

package com.banckle.rtmp.util;

import java.util.concurrent.atomic.AtomicInteger;

import org.apache.mina.core.filterchain.IoFilterAdapter;
import org.apache.mina.core.future.DefaultWriteFuture;
import org.apache.mina.core.future.WriteFuture;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.core.write.DefaultWriteRequest;
import org.apache.mina.core.write.WriteRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PrioritizedIoFilter extends IoFilterAdapter {

        private static final Logger log = 
LoggerFactory.getLogger(PrioritizedIoFilter.class);

        private static final Object MESSAGE_QUEUE_KEY = new Object();

        private static class MessageQueue {
                AtomicInteger sendingMessages = new AtomicInteger(0);
                PrioritizedQueue<WriteRequest> queue = new 
PrioritizedQueue<WriteRequest>();
                
                public boolean isSending() {
                        return sendingMessages.get() == 0 && queue.isEmpty();
                }
        }

        private static final MessageQueue getWriteQueue(IoSession session) {
                return (MessageQueue) session.getAttribute(MESSAGE_QUEUE_KEY);
        }

        private void internalWriteMessage(NextFilter nextFilter, IoSession 
session, MessageQueue mq, WriteRequest writeRequest) {
                mq.sendingMessages.incrementAndGet();
                nextFilter.filterWrite(session, writeRequest);
        }

        @Override
        public void filterWrite(NextFilter nextFilter, IoSession session, 
WriteRequest writeRequest) throws Exception {

                // the message queue
                MessageQueue mq = getWriteQueue(session);
                if (!(writeRequest.getMessage() instanceof PrioritizedMessage)) 
{
                        // just forward it to next filter
                        internalWriteMessage(nextFilter, session, mq, 
writeRequest);
                        return;
                }

                // unpack the message
                PrioritizedMessage msg = (PrioritizedMessage) 
writeRequest.getMessage();

                Object actualMessage = msg.getActualMessage();
                int priority = msg.getPriority();

                // create the unpacked write request
                WriteFuture future = new DefaultWriteFuture(session);
                WriteRequest unpackedRequest = new 
DefaultWriteRequest(actualMessage, future, writeRequest.getDestination());

                // send it directly if there's no message in the queue, 
otherwise put it
                // into the queue
                if (mq.isSending()) {
                        log.info("Thd({}) Message {} put into prioritized 
queue", Thread.currentThread().getName(), actualMessage);
                        mq.queue.offer(unpackedRequest, priority);
                } else {
                        log.info("Thd({}) Message {} sent directly", 
Thread.currentThread().getName(), actualMessage);
                        internalWriteMessage(nextFilter, session, mq, 
unpackedRequest);
                }
        }

        @Override
        public void messageSent(NextFilter nextFilter, IoSession session, 
WriteRequest writeRequest) throws Exception {
                super.messageSent(nextFilter, session, writeRequest);

                // the message queue
                MessageQueue mq = getWriteQueue(session);
                if (mq.sendingMessages.decrementAndGet() == 0) {
                        WriteRequest req = mq.queue.poll();
                        if (req != null) {
                                internalWriteMessage(nextFilter, session, mq, 
req);
                        }
                }
        }

        @Override
        public void sessionCreated(NextFilter nextFilter, IoSession session) 
throws Exception {
                session.setAttribute(MESSAGE_QUEUE_KEY, new MessageQueue());
                super.sessionCreated(nextFilter, session);
        }
}


======================     code end    =========================


Thanks


--------------------------------------------------
From: "Emmanuel LŽcharny" <[email protected]>
Sent: Wednesday, December 16, 2009 4:42 PM
To: <[email protected]>
Subject: Re: How do I test whether a IoSession is ready for write?

> [email protected] a écrit :
>> I want to know whether an IoSession is ready for write: that is, the 
>> session's writing queue is empty and the last write request is competed. I 
>> want to use IoSession.isWriterIdle but I'm afraid the method returns true 
>> even if the queue is not empty, just there's no messaging being sent. Am I 
>> correct? If not what could be the proper method to invoke?
>>   
> I'm not 100% sure, but isn't the messageSent event propagated only when 
> the message has been fully written?
> 
> Otherwise, give me a couple of hours to check the code and give you an 
> answer.
>> Best regards and many thanks
>> Wei 
> 
> 

Reply via email to